Thursday 31 October 2013

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

MongoDB supports running Hadoop MapReduce programs.
In this post we shall use Enron dataset (more information about the dataset can be found here ) to build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
Download the enron dataset from above link and load it to MongoDB using mongorestore to enron.messages collection.

For this MapReduce program we shall use a custom key which is defined below.

MailPair WritableComparable Class

MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it can be serialized across Hadoop nodes and sorted.

package com.example

public class MailPair implements WritableComparable{

    String from;
    String to;

    public MailPair(){
    }

    public MailPair(String from, String to){
        this.from = from;
        this.to = to;
    }

    public void readFields(DataInput in) throws IOException{
        this.from = in.readUTF();
        this.to = in.readUTF();
    }

    public void write(DataOutput out) throws IOException{
        out.writeUTF(this.from);
        out.writeUTF(this.to);
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof MailPair) {
           MailPair mp = (MailPair) o;
           return from == mp.from && to == mp.to;
        }
        return false;
    }

    @Override
    public int compareTo(Object o) {
        if(!(o instanceof MailPair)){
            return -1;
        }
        MailPair mp = (MailPair)o;
        int first = from.compareTo(mp.from);
        if(first != 0) return first;
        int second = to.compareTo(mp.to); 
        if(second != 0) return second;
        return 0;
    }

}

Mapper Class

The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key.

package com.example

public class EnronMailMapper
 extends Mapper{

    private static final Log LOG = LogFactory.getLog( EnronMailMapper.class );


 @Override
 public void map(Object key, BSONObject val, final Context context)
        throws IOException, InterruptedException{
  if(val.containsKey("headers")){
   BSONObject headers = (BSONObject)val.get("headers");
   if(headers.containsKey("From") && headers.containsKey("To")){
    String from = (String)headers.get("From");
    String to = (String)headers.get("To");
                String[] recips = to.split(",");
                for(int i=0;i 0){
                        context.write(new MailPair(from, recip), new IntWritable(1));
                    }
                }
   }
  }
 }

}

Reducer Class

The reduce class will take the collected values for each key, sum them together, and record the output.

package com.example

public class EnronMailReducer
 extends Reducer  {

    private static final Log LOG = LogFactory.getLog( EnronMailReducer.class );

    @Override
    public void reduce( final MailPair pKey,
                        final Iterable pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        int sum = 0;
        for ( final IntWritable value : pValues ){
            sum += value.get();
        }
        BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
        BSONWritable pkeyOut = new BSONWritable(outDoc);

        pContext.write( pkeyOut, new IntWritable(sum) );
    }
}

Driver Class


package com.example

public class EnronMail extends MongoTool {

    public int run( String[] args ) throws Exception{
        final Configuration conf = getConf();

        final Job job = new Job( conf, "enron-messgaes" );
        job.setJarByClass(EnronMail.class);
        job.setMapperClass(EnronMailMapper.class);
        job.setReducerClass(EnronMailReducer.class);

        job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);
        job.setMapOutputKeyClass(MailPair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(com.mongodb.hadoop.MongoOutputFormat.class);
        job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
        job.setOutputValueClass(org.apache.hadoop.io.IntWritable.class);

        return job.waitForCompletion( true ) ? 0 : 1;
    }
 
    public static void main( final String[] pArgs ) throws Exception{
        System.exit( ToolRunner.run( new EnronMail(), pArgs ) );
    }
}
Compile the jar and save it by the name enron.jar To run this MapReduce use below command:

hadoop jar enron.jar com.example.EnronMail -Dmongo.input.uri=mongodb://localhost:27017/enron.messages -Dmongo.output.uri=mongodb://localhost:27017/enron.messages_out
After running the MapReduce program we can query enron.messages_out collection to see if it is populated properly.

use enron
db.messages_out.findOne()
{
 "_id" : {
  "f" : "-nikole@excite.com",
  "t" : "bill.williams@enron.com"
 },
 "value" : 8
}

Implementing Custom Writable

Implementing Custom Writable

Implementing Custom Writable

Hadoop MapReduce uses implementations of Writables for interacting with user-provided Mappers and Reducers. Hadoop provides a lot of implementations of Writables which are listed here, but sometimes we need to pass custom objects and these custom objects should implement Hadoop's Writable interface. In this post we are going to describe a custom class IntPair. To implement the Writable interface we require two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}
The code for IntPair is given below:

public class IntPair implements Writable{
 private IntWritable first;
 private IntWritable second;
 
 public IntPair() {
  set(new IntWritable(), new IntWritable());
 }
 
 public IntPair(Integer first, Integer second) {
  set(new IntWritable(first), new IntWritable(second));
 }
 
 public void set(IntWritable first, IntWritable second) {
  this.first = first;
  this.second = second;
 }
 
 public IntWritable getFirst() {
  return first;
 }
 
 public Integer getFirstInt() {
  return new Integer(first.toString());
 }
 
 public Integer getSecondInt() {
  return new Integer(second.toString());
 }
 public IntWritable getSecond() {
  return second;
 }
 
 public void write(DataOutput out) throws IOException {
 first.write(out);
 second.write(out);
 }
 
 public void readFields(DataInput in) throws IOException {
 first.readFields(in);
 second.readFields(in);
 }
}
Now we can use this IntPair class in Hadoop MapReduce as value type. If we want to use IntPair as key in MapReduce then it needs to implement WritableComparable, which we shall cover in a different post.

Wednesday 16 October 2013

MapReduce on HBase Table

MapReduce on HBase Table

MapReduce on HBase Table

In my last post HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.

For our input we shall create a HBase table with below data:

create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'

create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.

The mapper class is:

public class testMapper extends TableMapper {
 
 @Override
 public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
   throws IOException, InterruptedException {

  try {
   // get rowKey and convert it to string
   String inKey = new String(rowKey.get());
   // set new key having only date
   String oKey = inKey.split("#")[0];
   // get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
   byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
   String sSales = new String(bSales);
   Integer sales = new Integer(sSales);
   // emit date and sales values
   context.write(new Text(oKey), new IntWritable(sales));
  } catch (RuntimeException e){
   e.printStackTrace();
  }
 }
}
The reducer class is:

public class testReducer extends TableReducer{
 
 @Override
 public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
  try {
   int sum = 0;
   // loop through different sales vales and add it to sum
   for (IntWritable sales : values) {
    Integer intSales = new Integer(sales.toString());
    sum += intSales;
   } 
   
   // create hbase put with rowkey as date
   Put insHBase = new Put(key.getBytes());
   // insert sum value to hbase 
   insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
   // write data to Hbase table
   context.write(null, insHBase);

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
The driver class is:

public class testDriver {
  public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 
 // define scan and define column families to scan
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes("cf1"));

 Job job = new Job(conf); 
 
    job.setJarByClass(testDriver.class);
 // define input hbase table
    TableMapReduceUtil.initTableMapperJob(
     "test1",
        scan,
        testMapper.class,
        Text.class,
        IntWritable.class,
        job);
 // define output table
    TableMapReduceUtil.initTableReducerJob(
      "test2",
      testReducer.class, 
      job);

    job.waitForCompletion(true);
  }
}
We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.

hbase(main):013:0> scan 'test2'
ROW                                COLUMN+CELL                                                                                        
 20130101                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2                                    
 20130102                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A                                    
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.

hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.

HBase Table MapReduce Basics

HBase Table MapReduce Basics

HBase Table MapReduce Basics

In this post I am going to explain how we can use HBase tables as source and target for MapReduce program.

For writing MapReduce on HBase tables we should follow below guidelines:

1. Mapper Class

  • Mapper class should extend TableMapper.
  • Input key to mapper is ImmutableBytesWritable object which has rowkey of HBase table.
  • Input value is Result object (org.apache.hadoop.hbase.client.Result) which contains the requested column families (define the required columns/column families in Scan) from HBase table.

2. Reducer Class

  • Reducer class should extend TableReducer.
  • Output key is NULL.
  • Output value is Put (org.apache.hadoop.hbase.client.Put) object.

3. MapReduce Driver

  • Configure a Scan (org.apache.hadoop.hbase.client.Scan) object. For this scan object we can define many parameters like:
    • Start row.
    • Stop row.
    • Row filter.
    • Column Familiy(s) to retrieve.
    • Column(s) to retrieve.
  • Define input table using TableMapReduceUtil.initTableMapperJob. In this method we can define input table, Mapper, MapOutputKey, MapOutputValue, etc.
  • Define output table using TableMapReduceUtil.initTableReducerJob. In this method we can define output table, Reducer and Partitioner.
In my next post I shall give an example MapReduce program using HBase tables as input and output.

Monday 14 October 2013

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can over come this issue by implementing partitions in Hive. Hive makes it very easy to implement partition by using automatic partition scheme when the table is created. In Hive’s implementation of partitioning, data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query. Today we are going to see how we can load a csv file to a partitioned table. For this we are going to use Airline OnTime dataset. Loading csv data to a partitioned table involves below mentioned two steps:
  1. Load csv file to a non-partitioned table.
  2. Load non-partitioned table data to partitioned table.
We shall partition Airline OnTime data based on two columns - year and month. 1. Load csv file to a non-partitioned table. We shall create a staging table to hold data from csv file. The hive commands to create schema and table are given below:
create schema stg_airline;
use stg_airline;

create table stg_airline.onTimePerf
(Year INT ,
Month INT ,
DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After creating the table load the csv data (note - delete header from csv) to table using below hive command:
LOAD DATA LOCAL INPATH "1987.csv" OVERWRITE INTO TABLE stg_airline.onTimePerf;
2. Load non-partitioned table data to partitioned table. We shall now create a table partitioned by year and month columns, the commands for this are given below:
create schema airline;
use airline;

create table airline.onTimePerf
(DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
PARTITIONED BY (Year INT, Month INT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
To load partitioned table we use below command:
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE airline.onTimePerf PARTITION(Year, Month) SELECT DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay, Year, Month FROM stg_airline.onTimePerf;
While writing insert statement for a partitioned table make sure that you specify the partition columns at the last in select clause. The 2 SET commands instruct hive to change our query to dynamically load partitions. If you don't execute the above 2 SET commands you will get below error:
FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
In my next blog I shall describe about using HCatalog in MapReduce program.

Sunday 13 October 2013

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Mongo-hadoop connector supports the creation of MongoDB-based Hive tables and BSON-based Hive tables. Both MongoDB-based Hive tables and BSON-based Hive tables can be:
  • Queried just like HDFS-based Hive tables.
  • Combined with HDFS-based Hive tables in joins and sub-queries.
Note - The current mongo-hadoop jars present in mongo-hadoop github doesn't contain MongoStorageHandler class so you need to build the jars yourself. In this post we shall use MongoStorageHandler to create a MongoDB-based hive table. Lets create a test.users collection in MongoDB with following data:
use test
db.users.insert({ "_id": 1, "name": "Tom", "age": 28 })
db.users.insert({ "_id": 2, "name": "Alice", "age": 18 })
db.users.insert({ "_id": 3, "name": "Bob", "age": 29 })
Now we shall create test.user table in Hive which is just a metadata created in Hive and the actual data will be stored in MongoDB. The commands for this are given below:
create schema test;
use test;

CREATE TABLE users (id int, name string, age int)
STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id","name":"name","age":"age"}')
TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");
Note - Specifying BSONSerDe Mappings is optional. Without it, all the field names in the Hive table are expected to be the same as in the underlying MongoDB collection. If the exact field names in Hive cannot be found in a MongoDB document, their values in the Hive table will be null. Now we can query the MongoDB collection in Hive also using HQL. We shall now try INSERT INTO TABLE command in HQL to see if we can load MongoDB collection from Hive. Lets create a temporary table in Hive, say users2 with same schema as users table and insert some data to it using a flat file.
cat users.txt
101,Scott,,10
104,Jesse,52
110,Mike,32

CREATE TABLE users2 (id int , name string , age int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH "users.txt" OVERWRITE INTO TABLE test.users2;
Now we shall use INSERT INTO TABLE command to insert data from Hive table users2 to users table.
INSERT INTO TABLE test.mongo_users SELECT id,name,age from test.users2;
There are some limitations of using MongoStorageHandler which are given below:
  • Using a StorageHandler will mark the table as non-native. Data cannot be loaded into the table directory from a "LOAD DATA" command. Instead, the data can only come from the result of a query.
  • Files are only stored as a single output file, not one per reduce call. This will cause much lock leasing problems.
  • INSERT INTO a StorageHandler table behaves as INSERT OVERWRITE.
In some other post I shall write about using BSON files as storage format for Hive tables.

Saturday 12 October 2013

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

In my last post Introduction To Hive's Partitioning I described how we can load csv data to a partitioned hive table. Today we shall see how we can use HCatalog to run MapReduce on Hive table and store the output in another Hive table.

HCatalog makes Hive metadata available to users of other Hadoop tools like Pig, MapReduce and Hive. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse.
HCatalog’s table abstraction presents users with a relational view of data in the (HDFS) and ensures that users need not worry about where or in what format their data is stored, so users don't need to know if data is stored in RCFile format, text files, or sequence files.
It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog provides HCatInputFormat/HCatOutputFormat to enable MapReduce users to read/write data in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

We shall see how we can use HCatalog to do a count of records for each year-month combination.
In mapper class we get table schema and use this schema information to get the required columns and their values.
Note - In context.write I am using a custom writable IntPair which I will describe in a separate post.

public class onTimeMapper extends Mapper {
    @Override
    protected void map(WritableComparable key, HCatRecord value,
     org.apache.hadoop.mapreduce.Mapper.Context context)
     throws IOException, InterruptedException {

     // Get table schema
     HCatSchema schema = HCatBaseInputFormat.getTableSchema(context);

     Integer year = new Integer(value.getString("year", schema));
     Integer month = new Integer(value.getString("month", schema));
     Integer DayofMonth = value.getInteger("dayofmonth", schema);

     context.write(new IntPair(year, month), new IntWritable(DayofMonth));
    }
}
In reducer class we create a record schema for holding the columns and their values, which is written to target Hive table.

public class onTimeReducer extends Reducer {
 public void reduce (IntPair key, Iterable value, Context context) 
  throws IOException, InterruptedException{
  
  int count = 0; // records counter for particular year-month
  for (IntWritable s:value) {
   count++;
  }
  
  // define output record schema
  List columns = new ArrayList(3);
  columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,""));
  HCatSchema schema = new HCatSchema(columns);
  HCatRecord record = new DefaultHCatRecord(3);
  
  record.setInteger("year", schema, key.getFirstInt()); 
  record.set("month", schema, key.getSecondInt());
  record.set("flightCount", schema, count);
  context.write(null, record);
 }
}
Finally we write driver class with input/output schema and table details:

public class onTimeDriver extends Configured implements Tool{
    private static final Log log = LogFactory.getLog( onTimeDriver.class );

    public int run( String[] args ) throws Exception{
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");
     job.setJarByClass(onTimeDriver.class);
     job.setMapperClass(onTimeMapper.class);
     job.setReducerClass(onTimeReducer.class);

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     job.setInputFormatClass(HCatInputFormat.class);
     job.setMapOutputKeyClass(IntPair.class);
     job.setMapOutputValueClass(IntWritable.class);
     
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(DefaultHCatRecord.class);
     job.setOutputFormatClass(HCatOutputFormat.class);
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job);
     HCatOutputFormat.setSchema(job, s);
     
     return (job.waitForCompletion(true)? 0:1);
    }
    
    public static void main(String[] args) throws Exception{
  int exitCode = ToolRunner.run(new onTimeDriver(), args);
  System.exit(exitCode);
 }
}
Before running this code we need to create the output table in Hive using below commands:
create table airline.flight_count
(Year INT ,
Month INT ,
flightCount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After compiling and making jar file we can run this code using hadoop jar command without any argument. You can check output using HQL by querying airline.flight_count table.

Note - I faced some hive errors while running the jar from any folder except $HIVE_HOME with apache setup, so if you face any issue it better to give a try by placing and running the jar from $HIVE_HOME folder.
Just for verifying that our MapReduce program is independent of output table schema we shall alter airline.flight_count to have a dummy column:
ALTER TABLE airline.flight_count1 ADD COLUMNS (dummy STRING);
After truncating this table we will start up our MapReduce program again and we can see that our program runs in same manner.