Mastering Hadoop
上QQ阅读APP看书,第一时间看更新

The Map task

The efficiency of the Map phase is decided by the specifications of the job inputs. We saw that having too many small files leads to proliferation of Map tasks because of a large number of splits. Another important statistic to note is the average runtime of a Map task. Too many or too few Map tasks are both detrimental for job performance. Striking a balance between the two is important, much of which depends on the nature of the application and data.

Tip

A rule of thumb is to have the runtime of a single Map task to be around a minute to three minutes, based on empirical evidence.

The dfs.blocksize attribute

The default block size of files in a cluster is overridden in the cluster configuration file, hdfs-site.xml, generally present in the etc/hadoop folder of the Hadoop installation. In some cases, a Map task might take only a few seconds to process a block. Giving a bigger block to the Map tasks in such cases is better. This can be done in the following ways:

  • Increasing the fileinputformat.split.minsize parameter to be greater than the block size
  • Increasing the block size of the input file stored in HDFS

The former leads to locality problems as InputSplit might have to import data from blocks residing in other nodes. The latter method preserves locality, but might require you to reload the file in HDFS. It can be done using the following command. A file tiny.dat.txt is being uploaded into HDFS with a block size of 512 MB. The default block size was 128 MB (in previous versions, it is 64 MB).

hadoop fs -D dfs.blocksize=536870912 -put tiny.dat.txt tiny.dat.newblock.txt

Tip

The number of Map tasks should not exceed 60,000 or 70,000 for any application.

There could be situations where Map tasks are CPU bound, that is, I/O is an insignificant part of the Map task runtime. In such cases, it is better to utilize all available computing resources within the cluster. Decreasing the fileinputformat.split.maxsize property to be less than the HDFS block size can help increase cluster resource utilization.

Tip

Fewer Map tasks that exploit data locality are good for job performance. But in the face of failures, they might increase the job latency. A single Map task processes a significant chunk of the data and failure might hold up the entire job.

Sort and spill of intermediate outputs

The process of sending intermediate outputs from the Map tasks to the Reduce tasks involves complexity on both the Map side (as shown in the following diagram) and the Reduce side. Not only does the output of the Map tasks have to be partitioned based on the key to send it to the right Reduce task, but the keys within each partition have to be sorted as well. The partitioned data is then distributed to the appropriate reducers.

Sort and spill of intermediate outputs

The Map task output workflow

The intermediate output records emitted by the Map task are not directly written on the disk. They are buffered in the local memory using a circular buffer before spilling them onto the disk. The size of this circular buffer is configured by the mapreduce.task.io.sort.mb property. The default value of this parameter is 100, that is, the circular buffer has a capacity of 100 MB. This property is overridden in the mapred-default.xml or mapred-site.xml file that is placed in the etc/hadoop directory of the Hadoop installation. All the properties discussed in this section go in the same config file. The buffered key-value records are serialized but not sorted.

Each key-value record is augmented with some accounting information. This accounting information has a constant value of 16 bytes per record regardless of the size of the actual key or value payload.

The soft threshold for the percentage of buffer that is allocated for the actual output record is given by the mapreduce.task.io.sort.spill.percent property. The default for this parameter is 0.8, that is, the output records from the buffer will be flushed to disk when the buffer becomes 80 percent full.

Note

Before MAPREDUCE-64, the io.record.sort.percent property was a soft threshold for the percentage of the buffer that is allocated for accounting information. It had a default value of 0.05. Spills used to be triggered if the accounting information reached this threshold. This used to cause more spills and underutilization of the buffer, particularly for smaller records.

After this patch, the io.record.sort.percent property gets auto-tuned based on the record size instead of being set manually.

The spilling happens on a background thread after reaching the soft threshold for buffer occupancy. The Map task is not blocked to write onto the circular buffer when the spilling is going on. However, if the circular buffer reaches a hard limit, the Map task is blocked until the spill is complete. The spilling thread does a partition of the records based on the key, sorts the keys in memory within each partition, and writes them to a file. For every spill, there is a separate file that is written.

Tip

The method map.sort.class determines the sorting algorithm used for sorting keys. The default is QuickSort, implemented in org.apache.hadoop.util.QuickSort.

The partitioner class is determined by the mapreduce.partitioner.class property. The spill thread uses an instance of this class to determine which Reduce task partition the record has to be assigned to.

Once the Map task is complete, the spill files are merged, with keys sorted in each partition and written to a single output file. The mapreduce.cluster.local.dir parameter contains the directories where the output files are placed. The number of streams to merge simultaneously while writing the output is determined by the value of the mapreduce.task.io.sort.factor parameter. The default number is 10, that is, 10 open file handles will be present at a time during this step.

Each time a spill happens onto disk, the I/O required is three times a normal I/O operation. Once the spill file is written and the Map task has ended, it is read, merged to form a single output file, and rewritten to disk. It is best to spill only once at the very end of the Map task.

Tip

If the merge step at the end of a Map is taking very less time, don't bother optimizing the sort and spill step.

The sort and spill step can be made more efficient in the following ways:

  • Increasing the size of the circular buffer by setting the mapreduce.task.io.mb property is one way to avoid or reduce the number of spills. When tweaking this parameter, it is good practice to monitor the Map task JVM heap size as well and increase it if necessary.
  • Increasing the mapreduce.task.io.sort.factor property by a factor of 100 or so. This will make the merge process faster and reduce disk access.
  • Writing efficient custom serializers for both keys and value types. The less the space taken up by serialized data, the more efficient the buffer usage.
  • Writing Combiners to efficiently aggregate Map task outputs. This not only reduces the data transferred over the network to the Reduce task, but also helps in writing faster to the disk and lesser storage of the Map task spills and output files. The subsequent subsection gives more details about Combiners.
  • Writing efficient key comparators and value grouping comparators can make a difference to the runtime of the sorting process.

Note

MapReduce-4039

Sorting of keys within a single partition might not be necessary in many kinds of MapReduce applications. Termed as Sort Avoidance, it may lead to significant performance gains. Reducers need not wait for the all Map tasks to complete before starting off.

This enhancement is currently marked as open and could be coming in future releases.

Node-local Reducers or Combiners

Combiners are node-local reducers used to aggregate intermediate map output locally on individual mapper outputs. Combiners can help to reduce the amount of data that needs to be transferred across to reducers. The base class used to derive and implement a combiner is the same as that in the case of a reducer. However, depending on the application, the developer may choose to have different logic in the combiner and the reducer. The combiner is specified for a job using the call setCombinerClass().

Tip

The JVM heap size for the Map task can be set using the mapreduce.map.java.opts parameter. The default value is –Xmx1024m.

If specified, the Combiner can be possibly called in two places:

  • When the spills are being flushed onto disk by the spill thread
  • When the spill files are being merged into a single output file to be consumed by the Reduce tasks

The former is called whenever a Combiner class is set for the job. The latter happens only if the number of spills exceeds the configuration value mapreduce.map.combine.minspills. The default value of this limit is three, that is, the Combiner is called during a merge only if there are three or more spills.

Tip

The intermediate files from a Map task matching a regular expression pattern can be preserved even after the job exits. This is done by specifying the pattern in the mapreduce.task.files.preserve.filepattern property.

Fetching intermediate outputs – Map-side

The Reducer needs to fetch Map task output files over the network to execute the Reduce task. The network being a bottleneck in a distributed system, the following Map-side optimizations can alleviate this:

  • The intermediate outputs of the Map tasks can be compressed using a suitable Compression codec. The configuration property mapreduce.map.output.compress can be set to true to enable compression. The type of compression codec to be used can be specified by the property mapreduce.map.output.compress.codec. There are many choices available for compression, which are detailed in later chapters.
  • The Reduce tasks fetch the output partitions from the Map task using the HTTP protocol. The mapreduce.tasktracker.http.threads property is used to configure the number threads that can service Reduce task fetch HTTP requests. Each fetch would need a single thread to service the request. Setting this property to a very low value would increase latency in servicing requests due of request queuing. The default value of this property is 40, indicating 40 threads.