Friday 15 March 2013

MapReduce Fundamentals

MapReduce Fundamentals

MapReduce Fundamentals

MapReduce is a programming model for efficient distributed computing.
The core concept of MapReduce in Hadoop is that input may be split into logical chunks, and each chunk may be initially processed independently, by a map task. The results of these individual processing chunks can be physically partitioned into distinct sets, which are then sorted. Each sorted chunk is passed to a reduce task.
Every MapReduce program must specify a Mapper and typically a Reducer.


Input and Output Formats

A MapReduce may specify how it’s input is to be read by specifying an InputFormat to be used
– InputSplit
– RecordReader
A MapReduce may specify how it’s output is to be written by specifying an OutputFormat to be used.
These default to TextInputFormat and TextOutputFormat, which process line-based text data.
SequenceFile: SequenceFileInputFormat and SequenceFileOutputFormat.
These are file-based, but they are not required to be.

How many Maps and Reduces

Maps
– Usually as many as the number of HDFS blocks being processed, this is the default.
– Else the number of maps can be specified as a hint.
– The number of maps can also be controlled by specifying the minimum split size.
– The actual sizes of the map inputs are computed by:
• max(min(block_size, data/#maps), min_split_size)
Reduces
– Unless the amount of data being processed is small
• 0.95*num_nodes*mapred.tasktracker.reduce.tasks.maximum

vi/vim Cheatsheet

There are many ways to edit files in Unix and for me one of the best ways is using screen-oriented text editor vi. There is an improved version of vi also called vim (vi improved).

Starting the vi editor:

CommandDescription
vi filenameCreates a new file if it already does not exist, otherwise opens existing file.
vi -R filenameOpens an existing file in read only mode.

Operation Modes:
vi has two modes of operation
  • Command mode: This mode enables you to perform administrative tasks such as saving files, executing commands, moving the cursor, cutting (yanking) and pasting lines or words, and finding and replacing. In this mode, whatever you type is interpreted as a command.
  • Insert mode: This mode enables you to insert text into the file. Everything that's typed in this mode is interpreted as input and finally it is put in the file .
The vi always starts in command mode. To enter text, you must be in insert mode. To come in insert mode you simply type i. To get out of insert mode, press the Esc key, which will put you back into command mode.
Hint: If you are not sure which mode you are in, press the Esc key twice, and then you'll be in command mode. You open a file using vi editor and start type some characters and then come in command mode to understand the difference.


vi cheatsheet:


Wednesday 13 March 2013

MapReduce with AWK

MapReduce in AWK

MapReduce with AWK


In my last post Lets MapReduce with Pentaho I had written a MapReduce program in Pentaho Data Integration. Now I am writing some MapReduce code in AWK to:
  • Calculate number/percentage of flights delayed by over 15 minutes aggregated at day level (on airline dataset).
I will be using hadoop streaming package to run my AWK MapReduce task.

Any MapReduce code should follow the common basics listed below:
  • The Mapper has a map method that transforms input (key, value) pairs into any number of intermediate (key’, value’) pairs.
  • The Reducer has a reduce method that transforms intermediate (key’, value’*) aggregates into any number of output (key’’, value’’) pairs.


Number/percentage of flights delayed by over 15 minutes aggregated at day level

I write a Map code to emit date in yyyyMMdd format, constant value 1 for each flight and a boolean value for flights with DepDelay (column#16) > 15.
#!/usr/bin/awk -f
BEGIN {
        FS=",";
}
{
        if ($16 > 15)
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,1);}
        else
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,0);}
}
   
The intermediate output of Map code will look like this:
......
19871001,1,1
19871001,1,1
19871002,1,0
19871002,1,0
19871002,1,0
..... 
  
Note that the output of mapper is sorted.
The reduce code will keep a count of flights per day and delayed flights.
To aggregate data (sum) by day level I will use array in awk.
#!/bin/awk -f

BEGIN {FS=",";
}
{       A[$1]+=$2;      # Add $2 to the array A having index of $1
        B[$1]+=$3;      # Add $3 to the array B having index of $1
}
END {
        for(i in A)    # Get all the indexes of array A in i
        {printf "%s,%d,%d,%5.2f\n", i,A[i],B[i],B[i]*100/A[i]}
}
 
Run this MapReduce code with hadoop streaming:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.0.jar \
  -input /airline/nh_1987.csv \
  -output /airline/output \
  -mapper path_to/airline2_m.awk \
  -reducer path_to/airline2_r.awk \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -outputformat org.apache.hadoop.mapred.TextOutputFormat \
  -file path_to/airline2_m.awk \
  -file path_to/airline2_r.awk
  
We can check the output of above code by using cat command or getting the file from hdfs to local filesystem:
hadoop fs -cat /airline/output/part-00000
hadoop fs -get /airline/output/part-00000