Mastering Hadoop
上QQ阅读APP看书,第一时间看更新

Filtering inputs

Filtering inputs to a job based on certain attributes is often required. Data-level filtering can be done within the Maps, but it is more efficient to filter at the file level before the Map task is spawned. Filtering enables only interesting files to be processed by Map tasks and can have a positive effect on the runtime of the Map by eliminating unnecessary file fetch. For example, files generated only within a certain time period might be required for analysis.

Let's use the 441-grant proposal file corpus subset to illustrate filtering. Let's process those files whose names match a particular regular expression and have a minimum file size. Both of these are specified as job parameters—filter.name and filter.min.size, respectively. Implementation entails extending the Configured class and implementing the PathFilter interface as shown in the following snippet. The Configured class is the base class for things that can be configured using Configuration. The PathFilter interface is the interface that contains an accept() method. The accept() method implementation takes in a Path parameter and returns true or false depending on whether the file has to be included in the input or not. The outline of the class is shown in the following snippet:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public static class MasteringHadoopPathAndSizeFilter extends Configured implements PathFilter {
        private Configuration configuration;
        private Pattern filePattern;
        private long filterSize;
        private FileSystem fileSystem;

        @Override
        public boolean accept(Path path){
                //Your accept override implementation goes here
        }

        @Override
        public void setConf(Configuration conf){
                //Your setConf override implementation goes here
        }
    }

An important change is to override the setConf() method. This method is used to set the private Configuration variable and read off any properties from it. In the driver class, the job has to be informed about the presence of a filter using the following line:

FileInputFormat.setInputPathFilter(job, MasteringHadoopPathAndSizeFilter.class);

The implementation of the setConf() method is as follows:

     @Override
        public void setConf(Configuration conf){
            this.configuration = conf;

            if(this.configuration != null){
                String filterRegex = this.configuration.get("filter.name");

                if(filterRegex != null){
                    this.filePattern = Pattern.compile(filterRegex);
                }

                String filterSizeString = this.configuration.get("filter.min.size");

                if(filterSizeString != null){
                    this.filterSize = Long.parseLong(filterSizeString);
                }

                try{
                    this.fileSystem = FileSystem.get(this.configuration);
                }
                catch(IOException ioException){
                    //Error handling
                }

            }
        }

In the following code, the accept() method returns true for all directories. The path of the current directory is one of the paths that will be provided to the accept() method. It uses the Java regular expression classes such as Pattern and Matches to determine whether any of the file paths match the expression and sets a Boolean variable appropriately. A second check is done to determine the file size and compare it with the file size filter. The FileSystem object exposes a getFileStatus() method that returns a FileStatus object, which can be examined for its file attributes via getters.

    @Override
        public boolean accept(Path path){
          boolean isFileAcceptable = true;
          try{
                if(fileSystem.isDirectory(path)){
                      return true;
                }

                if(filePattern != null){
                    Matcher m = filePattern.matcher(path.toString());
                    isFileAcceptable = m.matches();
                }

                if(filterSize > 0){
                    long actualFileSize = fileSystem.getFileStatus(path).getLen();
                    if(actualFileSize > this.filterSize){
                        isFileAcceptable &= true;
                    }
                    else{
                        isFileAcceptable = false;
                    }
                }

            }
            catch(IOException ioException){
                //Error handling goes here.

            }

            return isFileAcceptable;
        }

The following command line accepts files that have a999645 in their names and have sizes greater than 2,500 bytes. If either parameter is omitted, no filter is applied for that attribute.

hadoop jar MasteringHadoop-1.0-SNAPSHOT-jar-with-dependencies.jar –D filter.name=.*a999645.* -D filter.min.size=2500 grant-subset grant-subset-filter

Three files pass the test and the output is shown as follows. The filtering happens before the splits are decided.

14/04/10 21:34:38 INFO input.FileInputFormat: Total input paths to process : 3
14/04/10 21:34:39 INFO mapreduce.JobSubmitter: number of splits:3