
Filtering inputs
Filtering inputs to a job based on certain attributes is often required. Data-level filtering can be done within the Maps, but it is more efficient to filter at the file level before the Map task is spawned. Filtering enables only interesting files to be processed by Map tasks and can have a positive effect on the runtime of the Map by eliminating unnecessary file fetch. For example, files generated only within a certain time period might be required for analysis.
Let's use the 441-grant proposal file corpus subset to illustrate filtering. Let's process those files whose names match a particular regular expression and have a minimum file size. Both of these are specified as job parameters—filter.name
and filter.min.size
, respectively. Implementation entails extending the Configured
class and implementing the PathFilter
interface as shown in the following snippet. The Configured
class is the base class for things that can be configured using Configuration
. The PathFilter
interface is the interface that contains an accept()
method. The accept()
method implementation takes in a Path
parameter and returns true
or false
depending on whether the file has to be included in the input or not. The outline of the class is shown in the following snippet:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; public static class MasteringHadoopPathAndSizeFilter extends Configured implements PathFilter { private Configuration configuration; private Pattern filePattern; private long filterSize; private FileSystem fileSystem; @Override public boolean accept(Path path){ //Your accept override implementation goes here } @Override public void setConf(Configuration conf){ //Your setConf override implementation goes here } }
An important change is to override the setConf()
method. This method is used to set the private Configuration
variable and read off any properties from it. In the driver class, the job has to be informed about the presence of a filter using the following line:
FileInputFormat.setInputPathFilter(job, MasteringHadoopPathAndSizeFilter.class);
The implementation of the setConf()
method is as follows:
@Override public void setConf(Configuration conf){ this.configuration = conf; if(this.configuration != null){ String filterRegex = this.configuration.get("filter.name"); if(filterRegex != null){ this.filePattern = Pattern.compile(filterRegex); } String filterSizeString = this.configuration.get("filter.min.size"); if(filterSizeString != null){ this.filterSize = Long.parseLong(filterSizeString); } try{ this.fileSystem = FileSystem.get(this.configuration); } catch(IOException ioException){ //Error handling } } }
In the following code, the accept()
method returns true
for all directories. The path of the current directory is one of the paths that will be provided to the accept()
method. It uses the Java regular expression classes such as Pattern
and Matches
to determine whether any of the file paths match the expression and sets a Boolean variable appropriately. A second check is done to determine the file size and compare it with the file size filter. The FileSystem
object exposes a getFileStatus()
method that returns a FileStatus
object, which can be examined for its file attributes via getters.
@Override
public boolean accept(Path path){
boolean isFileAcceptable = true;
try{
if(fileSystem.isDirectory(path)){
return true;
}
if(filePattern != null){
Matcher m = filePattern.matcher(path.toString());
isFileAcceptable = m.matches();
}
if(filterSize > 0){
long actualFileSize = fileSystem.getFileStatus(path).getLen();
if(actualFileSize > this.filterSize){
isFileAcceptable &= true;
}
else{
isFileAcceptable = false;
}
}
}
catch(IOException ioException){
//Error handling goes here.
}
return isFileAcceptable;
}
The following command line accepts files that have a999645
in their names and have sizes greater than 2,500 bytes. If either parameter is omitted, no filter is applied for that attribute.
hadoop jar MasteringHadoop-1.0-SNAPSHOT-jar-with-dependencies.jar –D filter.name=.*a999645.* -D filter.min.size=2500 grant-subset grant-subset-filter
Three files pass the test and the output is shown as follows. The filtering happens before the splits are decided.
14/04/10 21:34:38 INFO input.FileInputFormat: Total input paths to process : 3 14/04/10 21:34:39 INFO mapreduce.JobSubmitter: number of splits:3