
The Map task
The efficiency of the Map phase is decided by the specifications of the job inputs. We saw that having too many small files leads to proliferation of Map tasks because of a large number of splits. Another important statistic to note is the average runtime of a Map task. Too many or too few Map tasks are both detrimental for job performance. Striking a balance between the two is important, much of which depends on the nature of the application and data.
The dfs.blocksize attribute
The default block size of files in a cluster is overridden in the cluster configuration file, hdfs-site.xml
, generally present in the etc/hadoop
folder of the Hadoop installation. In some cases, a Map task might take only a few seconds to process a block. Giving a bigger block to the Map tasks in such cases is better. This can be done in the following ways:
- Increasing the
fileinputformat.split.minsize
parameter to be greater than the block size - Increasing the block size of the input file stored in HDFS
The former leads to locality problems as InputSplit
might have to import data from blocks residing in other nodes. The latter method preserves locality, but might require you to reload the file in HDFS. It can be done using the following command. A file tiny.dat.txt
is being uploaded into HDFS with a block size of 512 MB. The default block size was 128 MB (in previous versions, it is 64 MB).
hadoop fs -D dfs.blocksize=536870912 -put tiny.dat.txt tiny.dat.newblock.txt
There could be situations where Map tasks are CPU bound, that is, I/O is an insignificant part of the Map task runtime. In such cases, it is better to utilize all available computing resources within the cluster. Decreasing the fileinputformat.split.maxsize
property to be less than the HDFS block size can help increase cluster resource utilization.
Sort and spill of intermediate outputs
The process of sending intermediate outputs from the Map tasks to the Reduce tasks involves complexity on both the Map side (as shown in the following diagram) and the Reduce side. Not only does the output of the Map tasks have to be partitioned based on the key to send it to the right Reduce task, but the keys within each partition have to be sorted as well. The partitioned data is then distributed to the appropriate reducers.

The Map task output workflow
The intermediate output records emitted by the Map task are not directly written on the disk. They are buffered in the local memory using a circular buffer before spilling them onto the disk. The size of this circular buffer is configured by the mapreduce.task.io.sort.mb
property. The default value of this parameter is 100, that is, the circular buffer has a capacity of 100 MB. This property is overridden in the mapred-default.xml
or mapred-site.xml
file that is placed in the etc/hadoop
directory of the Hadoop installation. All the properties discussed in this section go in the same config file. The buffered key-value records are serialized but not sorted.
Each key-value record is augmented with some accounting information. This accounting information has a constant value of 16 bytes per record regardless of the size of the actual key or value payload.
The soft threshold for the percentage of buffer that is allocated for the actual output record is given by the mapreduce.task.io.sort.spill.percent
property. The default for this parameter is 0.8, that is, the output records from the buffer will be flushed to disk when the buffer becomes 80 percent full.
Note
Before MAPREDUCE-64, the io.record.sort.percent
property was a soft threshold for the percentage of the buffer that is allocated for accounting information. It had a default value of 0.05. Spills used to be triggered if the accounting information reached this threshold. This used to cause more spills and underutilization of the buffer, particularly for smaller records.
After this patch, the io.record.sort.percent
property gets auto-tuned based on the record size instead of being set manually.
The spilling happens on a background thread after reaching the soft threshold for buffer occupancy. The Map task is not blocked to write onto the circular buffer when the spilling is going on. However, if the circular buffer reaches a hard limit, the Map task is blocked until the spill is complete. The spilling thread does a partition of the records based on the key, sorts the keys in memory within each partition, and writes them to a file. For every spill, there is a separate file that is written.
The partitioner class is determined by the mapreduce.partitioner.class
property. The spill thread uses an instance of this class to determine which Reduce task partition the record has to be assigned to.
Once the Map task is complete, the spill files are merged, with keys sorted in each partition and written to a single output file. The mapreduce.cluster.local.dir
parameter contains the directories where the output files are placed. The number of streams to merge simultaneously while writing the output is determined by the value of the mapreduce.task.io.sort.factor
parameter. The default number is 10, that is, 10 open file handles will be present at a time during this step.
Each time a spill happens onto disk, the I/O required is three times a normal I/O operation. Once the spill file is written and the Map task has ended, it is read, merged to form a single output file, and rewritten to disk. It is best to spill only once at the very end of the Map task.
The sort and spill step can be made more efficient in the following ways:
- Increasing the size of the circular buffer by setting the
mapreduce.task.io.mb
property is one way to avoid or reduce the number of spills. When tweaking this parameter, it is good practice to monitor the Map task JVM heap size as well and increase it if necessary. - Increasing the
mapreduce.task.io.sort.factor
property by a factor of 100 or so. This will make the merge process faster and reduce disk access. - Writing efficient custom serializers for both keys and value types. The less the space taken up by serialized data, the more efficient the buffer usage.
- Writing Combiners to efficiently aggregate Map task outputs. This not only reduces the data transferred over the network to the Reduce task, but also helps in writing faster to the disk and lesser storage of the Map task spills and output files. The subsequent subsection gives more details about Combiners.
- Writing efficient key comparators and value grouping comparators can make a difference to the runtime of the sorting process.
Note
MapReduce-4039
Sorting of keys within a single partition might not be necessary in many kinds of MapReduce applications. Termed as Sort Avoidance, it may lead to significant performance gains. Reducers need not wait for the all Map tasks to complete before starting off.
This enhancement is currently marked as open and could be coming in future releases.
Node-local Reducers or Combiners
Combiners are node-local reducers used to aggregate intermediate map output locally on individual mapper outputs. Combiners can help to reduce the amount of data that needs to be transferred across to reducers. The base class used to derive and implement a combiner is the same as that in the case of a reducer. However, depending on the application, the developer may choose to have different logic in the combiner and the reducer. The combiner is specified for a job using the call setCombinerClass()
.
If specified, the Combiner can be possibly called in two places:
- When the spills are being flushed onto disk by the spill thread
- When the spill files are being merged into a single output file to be consumed by the Reduce tasks
The former is called whenever a Combiner class is set for the job. The latter happens only if the number of spills exceeds the configuration value mapreduce.map.combine.minspills
. The default value of this limit is three, that is, the Combiner is called during a merge only if there are three or more spills.
Fetching intermediate outputs – Map-side
The Reducer needs to fetch Map task output files over the network to execute the Reduce task. The network being a bottleneck in a distributed system, the following Map-side optimizations can alleviate this:
- The intermediate outputs of the Map tasks can be compressed using a suitable Compression codec. The configuration property
mapreduce.map.output.compress
can be set totrue
to enable compression. The type of compression codec to be used can be specified by the propertymapreduce.map.output.compress.codec
. There are many choices available for compression, which are detailed in later chapters. - The Reduce tasks fetch the output partitions from the Map task using the HTTP protocol. The
mapreduce.tasktracker.http.threads
property is used to configure the number threads that can service Reduce task fetch HTTP requests. Each fetch would need a single thread to service the request. Setting this property to a very low value would increase latency in servicing requests due of request queuing. The default value of this property is 40, indicating 40 threads.