Hadoop Map Reduce Jobs Test

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

The MapReduce process typically goes through four phases, namely:

  • Splitting: An input to a MapReduce job is divided into fixed-size pieces called input splits Input split is a chunk of the input that is consumed by a single map
  • Mapping: This is the very first phase in the execution of map-reduce program. In this phase data in each split is passed to a mapping function to produce output values.
  • Shuffling: This phase consumes the output of Mapping phase. Its task is to consolidate the relevant records from Mapping phase output.
  • Reducing: In this phase, output values from the Shuffling phase are aggregated. This phase combines values from Shuffling phase and returns a single output value.

This complete execution process is controlled by the following entities:

  • JobTracker: Acts like a master (responsible for complete execution of submitted job)
  • Multiple TaskTrackers: Act like slaves, each of them performing the job

For every job submitted for execution in the system, there is one JobTracker that resides on NameNode and there are multiple TaskTrackers which reside on multiple DataNodes.

  • A job is divided into multiple tasks which are then run on multiple DataNodes in a cluster.
  • It is the responsibility of JobTracker to coordinate the activity by scheduling tasks to run on different DataNodes.
  • The TaskTracker on individual DataNodes is responsible for the execution of the assigned tasks - i.e., the portion of the job assigned to each of them as tasks.
  • Periodically, the TaskTracker on a DataNode sends a progress report to the JobTracker, updating it with the status of task executed on that node.
  • In addition, the TaskTracker periodically sends a 'heartbeat' signal to the JobTracker so as to notify him of the current state of the system.
  • Thus, the JobTracker keeps track of the overall progress of each job. In the event of task failure, the JobTracker can reschedule it on a different TaskTracker.

Hadoop users generally do not take kindly to the failure of MapReduce jobs, as such failures impact application performance. To assure users of an above-par experience with a Hadoop cluster therefore, administrators have to monitor the MapReduce jobs that each user is running on a cluster and track their status. This way, they can instantly identify failed jobs, isolate the user who is running such jobs, and investigate the reason for the failure. Periodic evaluation of the performance of MapReduce jobs is also necessary because it may point administrators to processing bottlenecks and resource crunches caused by improper configuration of the jobs. With the help of the Hadoop Map Reduce Jobs test, administrators can achieve all of the above!

This test auto-discovers the users running MapReduce jobs on the cluster, and for each user, reports the count of jobs in different states. In the process, the test alerts administrators to failed jobs and jobs with errors. Additionally, for each user, the test also measures how much time the jobs run by that user took to complete. This points administrators to slow jobs and the user running them. The test also highlights users whose jobs took the maximum time for map/reduce processing. Detailed diagnostics not only shed light on such jobs, but also accurately tell where the job execution was bottlecked - in running map tasks? or in running reduce tasks? This greatly aids troubleshooting. Moreover, the test also pinpoints jobs requiring more heap memory. This way, the test reveals to administrators if improper job configuration is what caused job execution to slow down.

Target of the test : A Hadoop cluster

Agent deploying the test : A remote agent

Outputs of the test : One set of the results for each user running MapReduce jobs on the cluster

Configurable parameters for the test
Parameter Description

Test Period

How often should the test be executed.

Host

The IP address of the NameNode that processes client connections to the cluster. NameNode is the master node in the Apache Hadoop HDFS Architecture that maintains and manages the blocks present on the DataNodes (slave nodes). NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients.

Port

The port at which the NameNode accepts client connections. NameNode is the master node in the Apache Hadoop HDFS Architecture that maintains and manages the blocks present on the DataNodes (slave nodes). NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. By default, the NameNode's client connection port is 8020.

Name Node Web Port

The eG agent collects metrics using Hadoop's WebHDFS REST API. While some of these API calls pull metrics from the NameNode, some others get metrics from the resource manager. NameNode is the master node in the Apache Hadoop HDFS Architecture that maintains and manages the blocks present on the DataNodes (slave nodes). NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. To run API commands on the NameNode and pull metrics, the eG agent needs access to the NameNode's web port.

To determine the correct web port of the NameNode, do the following:

  • Open the hdfs-default.xml file in the hadoop/conf/app directory.
  • Look for the dfs.namenode.http-address parameter in the file.
  • This parameter is configured with the IP address and base port where the DFS NameNode web user interface listens on. The format of this configuration is: <IP_Address>:<Port_Number>. Given below is a sample configuration:

    192.168.10.100:50070

Configure the <Port_Number> in the specification as the Name Node Web Port. In the case of the above sample configuration, this will be 50070.

Name Node User Name

The eG agent collects metrics using Hadoop's WebHDFS REST API. While some of these API calls pull metrics from the NameNode, some others get metrics from the resource manager. NameNode is the master node in the Apache Hadoop HDFS Architecture that maintains and manages the blocks present on the DataNodes (slave nodes). NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients.

In some Hadoop configurations, a simple authentication user name may be required for running API commands and collecting metrics from the NameNode. When monitoring such Hadoop installations, specify the name of the simple authentication user here. If no such user is available/required, then do not disturb the default value none of this parameter.

Resource  Manager IP and Resource Manager Web Port

The eG agent collects metrics using Hadoop's WebHDFS REST API. While some of these API calls pull metrics from the NameNode, some others get metrics from the resource manager. The YARN Resource Manager Service (RM) is the central controlling authority for resource management and makes resource allocation decisions.

To pull metrics from the resource manager, the eG agents first needs to connect to the resource manager. For this, you need to configure this test with the IP address/host name of the resource manager and its web port. Use the Resource Manager IP and Resource Manager Web Port parameters to configure these details.

To determine the IP/host name and web port of the resource manager, do the following:

  • Open the yarn-site.xml file in the /opt/mapr/hadoop/hadoop-2. x.x/etc/hadoop directory.
  • Look for the yarn.resourcemanager.webapp.address parameter in the file.
  • This parameter is configured with the IP address/host name and web port of the resource manager. The format of this configuration is: <IP_Address_or_Host_Name>:<Port_Number>. Given below is a sample configuration:

    192.168.10.100:8080

Configure the <IP_Address_or_Host_Name> in the specification as the Resource Manager IP, and the <Port_Number> as the Resource Manager Web Port. In the case of the above sample configuration, this will be 8080.

Resource Manager Username

The eG agent collects metrics using Hadoop's WebHDFS REST API. While some of these API calls pull metrics from the NameNode, some others get metrics from the resource manager. The YARN Resource Manager Service (RM) is the central controlling authority for resource management and makes resource allocation decisions.

In some Hadoop configurations, a simple authentication user name may be required for running API commands and collecting metrics from the resource manager. When monitoring such Hadoop installations, specify the name of the simple authentication user here. If no such user is available/required, then do not disturb the default value none of this parameter.

DD Row Count

By default, the detailed diagnosis of this test, if enabled, will report only the top-10 records. This is why, the DD Row Count parameter is set to 10 by default. If you want to include more or less records in detailed diagnosis, then change the value of this parameter accordingly.

Detailed Diagnosis

To make diagnosis more efficient and accurate, the eG Enterprise embeds an optional detailed diagnostic capability. With this capability, the eG agents can be configured to run detailed, more elaborate tests as and when specific problems are detected. To enable the detailed diagnosis capability of this test for a particular server, choose the On option. To disable the capability, click on the Off option. The option to selectively enabled/disable the detailed diagnosis capability will be available only if the following conditions are fulfilled:

  • The eG manager license should allow the detailed diagnosis capability
  • Both the normal and abnormal frequencies configured for the detailed diagnosis measures should not be 0.
Measurements made by the test
Measurement Description Measurement Unit Interpretation

Total jobs

Indicates the total number of MapReduce jobs that are run by this user.

Number

Compare the value of this measure across users to know which user is imposing the maximum MapReduce load on the cluster.

New jobs

Indicates the number of MapReduce jobs started by this user during the last measurement period.

Number

 

Initiated jobs

Indicates the number of MapReduce jobs that this user has initiated.

Number

Job setup is performed during initialization. For example, create the temporary output directory for the job during the initialization of the job.

Running jobs

Indicates the number of jobs of this user that is currently in the RUNNING state.

Number

Once the setup task completes, the job will be moved to RUNNING state.

Succeeded jobs

Indicates the number of jobs run by this user that successfully completed execution.

Number

A high value is desired for this measure. Ideally, the value of this measure should be equal to the value of the Running jobs measure.

Failed jobs

Indicates the number of jobs run by this user that failed.

Number

A low or 0 value is desired for this measure.

A non-zero value indicates that one/more jobs have failed for this user. In such a case, use the detailed diagnosis of this measure to know which jobs failed.

By default, a job fails when one/more tasks it constitutes fail more than four times. If a task fails four times, it will not be retried again. This value is configurable. The maximum number of attempts to run a task is controlled by the mapreduce.map.maxattempts property for map tasks and mapreduce.reduce.maxattempts for reduce tasks.

The most common occurrence of this failure is when user code in the map or reduce task throws a runtime exception. If this happens, the task JVM reports the error back to its parent application master before it exits. The error ultimately makes it into the user logs. The application master marks the task attempt as failed, and frees up the container so its resources are available for another task.

Another failure mode is the sudden exit of the task JVM perhaps there is a JVM bug that causes the JVM to exit for a particular set of circumstances exposed by the MapReduce user code. In this case, the node manager notices that the process has exited and informs the application master so it can mark the attempt as failed.

Hanging tasks are dealt with differently. The application master notices that it hasn’t received a progress update for a while and proceeds to mark the task as failed. The task JVM process will be killed automatically after this period. The timeout period after which tasks are considered failed is normally 10 minutes and can be configured on a per-job basis (or a cluster basis) by setting the mapreduce.task.timeout property to a value in milliseconds.

Kill wait jobs

Indicates the number of jobs initiated by this user waiting to be killed.

Number

Use the detailed diagnosis of this measure to identify the jobs that are in the KILLWAIT state.

Killed jobs

Indicates the number of jobs that this user killed.

Number

Use the detailed diagnosis of this measure to identify the killed jobs.

Error jobs

Indicates the number of jobs of this user that spewed errors.

Number

Ideally, the value of this measure should be 0.

If a non-zero value is reported, then you can use the detailed diagnosis of this measure to know which jobs experienced errors during execution.

Average job duration

Indicates the average time that the jobs of this user took for execution.

Seconds

An unusually high value for this measure is a cause for concern, as it implies that the jobs of a user are taking too long to execute than normal. To identify which jobs are taking maximum time, use the detailed diagnosis of this measure.

Maximum job duration

Indicates the maximum time taken by the jobs of this user for execution.

Seconds

 

Maximum map/reduce processing time

Indicates the maximum time taken by the jobs of this user to perform MapReduce processing.

Seconds

If this value is abnormally high for any user, then use the detailed diagnosis of this measure to know which jobs took the longest to perform map/reduce processing and where they spent maximum time - in running map tasks? or in running reduce tasks?

Maximum GC time percentage

Indicates the maximum time spent by the jobs of this user in garbage collection.

Percent

A high value of this measure indicates that jobs do not have enough heap memory for processing, owing to which garbage collection occurs often. If this value is very high for any user, then use the detailed diagnosis of this measure to figure out which specific jobs spent too much time in garbage collection. By allocating the heap size of such jobs, you can reduce this percentage significantly.

Maximum input records for reduce task

Indicates the number of jobs of this user that is taking the maximum number of input records for processing reduce tasks.

Number

To know which jobs are taking the maximum number of input records, use the detailed diagnosis of this measure.

Maximum spilled records to disk

Indicates the number of jobs of this user that has spilled the maximum number of records to disk.

Number

Map output data is stored in memory, but when the buffer fills up data is dumped to disk. A separate thread is spawned to merge all of the spilled data into a single larger sorted file for reducers. Spills to disk should be minimized, as more than one spill will result in an increase in disk activity as the merging thread has to read and write data to disk. Tracking this metric can help determine if configuration changes need to be made (like increasing the memory available for map tasks) or if additional processing (compression) of the input data is necessary.

If this measure reports a high value for any user, then use the detailed diagnosis of this measure to know which jobs spilled the maximum number of records to disk. You can then review the configuration of such jobs to see if it needs to be tweaked to reduce disk spills and improve performance.