Sunday 13 October 2013

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Mongo-hadoop connector supports the creation of MongoDB-based Hive tables and BSON-based Hive tables. Both MongoDB-based Hive tables and BSON-based Hive tables can be:
  • Queried just like HDFS-based Hive tables.
  • Combined with HDFS-based Hive tables in joins and sub-queries.
Note - The current mongo-hadoop jars present in mongo-hadoop github doesn't contain MongoStorageHandler class so you need to build the jars yourself. In this post we shall use MongoStorageHandler to create a MongoDB-based hive table. Lets create a test.users collection in MongoDB with following data:
use test
db.users.insert({ "_id": 1, "name": "Tom", "age": 28 })
db.users.insert({ "_id": 2, "name": "Alice", "age": 18 })
db.users.insert({ "_id": 3, "name": "Bob", "age": 29 })
Now we shall create test.user table in Hive which is just a metadata created in Hive and the actual data will be stored in MongoDB. The commands for this are given below:
create schema test;
use test;

CREATE TABLE users (id int, name string, age int)
STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id","name":"name","age":"age"}')
TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");
Note - Specifying BSONSerDe Mappings is optional. Without it, all the field names in the Hive table are expected to be the same as in the underlying MongoDB collection. If the exact field names in Hive cannot be found in a MongoDB document, their values in the Hive table will be null. Now we can query the MongoDB collection in Hive also using HQL. We shall now try INSERT INTO TABLE command in HQL to see if we can load MongoDB collection from Hive. Lets create a temporary table in Hive, say users2 with same schema as users table and insert some data to it using a flat file.
cat users.txt
101,Scott,,10
104,Jesse,52
110,Mike,32

CREATE TABLE users2 (id int , name string , age int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH "users.txt" OVERWRITE INTO TABLE test.users2;
Now we shall use INSERT INTO TABLE command to insert data from Hive table users2 to users table.
INSERT INTO TABLE test.mongo_users SELECT id,name,age from test.users2;
There are some limitations of using MongoStorageHandler which are given below:
  • Using a StorageHandler will mark the table as non-native. Data cannot be loaded into the table directory from a "LOAD DATA" command. Instead, the data can only come from the result of a query.
  • Files are only stored as a single output file, not one per reduce call. This will cause much lock leasing problems.
  • INSERT INTO a StorageHandler table behaves as INSERT OVERWRITE.
In some other post I shall write about using BSON files as storage format for Hive tables.

3 comments:

  1. Hi,
    I'm unable execute the 'INSERT INTO' step when there's more than one field. The mapreduce job fails and returns the following error:
    FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

    But it works fine for one field.
    Please suggest some solution.

    Thanks.

    ReplyDelete
    Replies
    1. Hi,
      Please check your mapreduce logs for more details. The error thrown in Hive is having very less information.

      Delete
    2. Hi,
      The mapreduce logs:

      Application application_1449234563003_0177 failed 2 times due to AM Container for appattempt_1449234563003_0177_000002 exited with exitCode: 1 due to: Exception from container-launch.
      Container id: container_1449234563003_0177_02_000001
      Exit code: 1
      Stack trace: ExitCodeException exitCode=1:
      at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
      at org.apache.hadoop.util.Shell.run(Shell.java:455)
      at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
      at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:196)
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
      at java.util.concurrent.FutureTask.run(FutureTask.java:166)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
      at java.lang.Thread.run(Thread.java:722)
      Container exited with a non-zero exit code 1
      .Failing this attempt.. Failing the application.

      Delete