Thursday 27 February 2014

Convert csv data to Avro data

In one of my previous post I explained how we can convert json data to avro data and vice versa using avro tools command line option. Today I was trying to see what options we have for converting csv data to avro format, as of now we don't have any avro tool option to accomplish this . Now, we can either write our own java program (MapReduce program or a simple java program) or we can use various SerDe's available with Hive to do this quickly and without writing any code :)

To convert csv data to Avro data using Hive we need to follow below steps:
  1. Create a Hive table stored as textfile and specify your csv delimiter also.
  2. Load csv file to above table using "load data" command.
  3. Create another Hive table using AvroSerDe.
  4. Insert data from former table to new Avro Hive table using "insert overwrite" command.

To demonstrate this I will use use below data (student.csv):
0,38,91
0,65,28
0,78,16
1,34,96
1,78,14
1,11,43
Now execute below queries in Hive:

Now you can get data in Avro format from Hive warehouse folder. To dump this file to local file system use below command:
hadoop fs -cat /path/to/warehouse/test.db/avro_table/* > student.avro

If you want to get json data from this avro file you can use avro tools command:
java -jar avro-tools-1.7.5.jar tojson student.avro > student.json

So we can easily convert csv to avro and csv to json also by just writing 4 HQLs.

Tuesday 25 February 2014

Implementing Custom WritableComparable

In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.

WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
{
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.

The code for IntPair class which is used in In-mapper Combiner Program to Calculate Average post is given below:


As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.

Wednesday 19 February 2014

Avro Schema Evolution

Avro can use different schemas for serialization and deserialization, and it can handle removed, added and modified fields. Thus it helps in building decoupled and robust systems.

In this post we will serialize data using this schema:

and deserialize it using a different schema
which has following modifications:
  1. university_id field is removed.
  2. age field is added.
  3. result_score field is renamed to score.
Before we actually see how Avro handles these modification I would like to mention below points:
  • If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
    Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
  • If a field is renamed then the old name must be present as aliases.

In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema

On executing this code we see that Avro handles the modifications without any issues and our data is deserialized properly.

Tuesday 18 February 2014

Getting started with Avro Part2

In the previous post we used avro-tools commands to serialize and deserialize data. In this post we post we will use Avro Java API for achieving the same. We will use same sample data and schema from our previous post.

The java code for serializing and deserializing data without generating the code for schema is given below:


For generating the schema java code from Avro json schema we can use avro-tools jar. The command for same is given below:
java -jar avro-tools-1.7.5.jar compile schema StudentActivity.avsc <output_path>
Output path can be source folder for the project or we can add the generated java class files to Eclipse IDE manually.

The java code for serializing and deserializing data with generating the code for schema is similar to above code except that in previous code we were assiging values to a GenericRecord and in this one we are assigning values to the generated Avro object:


In next post we will see how Avro deals with schema evolution.

Getting started with Avro Part1

In our previous post we got some basic idea about Avro, in this post we will use Avro for serializing and deserializing data.

We will use these 3 methods in which we can use Avro for serialization/deserialization:
  1. Using Avro command line tools.
  2. Using Avro Java API without code generation.
  3. Using Avro Java API with code generation.

Sample Data

We will use below sample data (StudentActivity.json):
Note that the JSON records are nested ones.

Defining a schema

Avro schemas are defined using JSON. The avro schema for our sample data is defined as below (StudentActivity.avsc):


1. Serialization/Deserialization using Avro command line tools

Avro provides a jar file by name avro-tools-<version>.jar which provides many command line tools as listed below:


For converting json sample data to Avro binary format use "fromjson" option and for getting json data back from Avro files use "tojson" option.

Command for serializing json
Without any compression
java -jar avro-tools-1.7.5.jar fromjson --schema-file StudentActivity.avsc StudentActivity.json > StudentActivity.avro

With snappy compression
java -jar avro-tools-1.7.5.jar fromjson --schema-file StudentActivity.avsc StudentActivity.json > StudentActivity.snappy.avro

Command for deserializing json
The same command is used for deserializing both compressed and uncompressed data
java -jar avro-tools-1.7.5.jar tojson StudentActivity.avro
java -jar avro-tools-1.7.5.jar tojson StudentActivity.snappy.avro

As Avro data file contains the schema also, we can retrieve it using this commmand:
java -jar avro-tools-1.7.5.jar getschema StudentActivity.avro
java -jar avro-tools-1.7.5.jar getschema StudentActivity.snappy.avro
In our next post we will use Avro Java API for serialization/deserialization.

Monday 17 February 2014

Introduction to Avro

Apache Avro is a popular data serialization format and is gaining more users as many hadoop based tools natively support Avro for serialization and deserialization.
In this post we will understand some basics about Avro.

What is Avro?
Data serialization system
Uses JSON based schemas
Uses RPC calls to send data
Schema's sent during data exchange

Avro provides:
Rich data structures.
A compact, fast, binary data format.
A container file, to store persistent data.
Simple integration with many languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.
Avro API's exist for these languages Java, C, C++, C#, Python and Ruby.

Avro Schema:
Avro relies on schemas for serialization/deserialization.
When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.
If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.
Avro supports a wide range of datatypes which are listed below:
Primitive Types
  • null: no value
  • boolean: a binary value
  • int: 32-bit signed integer
  • long: 64-bit signed integer
  • float: single precision (32-bit) IEEE 754 floating-point number
  • double: double precision (64-bit) IEEE 754 floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: unicode character sequence
Complex Types
Avro supports six kinds of complex types: records, enums, arrays, maps, unions and fixed. Detailed information on these complex types is available here.

Schema Resolution:
A reader of Avro data, whether from an RPC or a file, can always parse that data because its schema is provided. But that schema may not be exactly the schema that was expected. For example, if the data was written with a different version of the software than it is read, then records may have had fields added or removed.

We call the schema used to write the data as the writer's schema, and the schema that the application expects the reader's schema. Differences between these should be resolved as follows:
  • It is an error if the two schemas do not match.
    To match, one of the following must hold:
    • both schemas are arrays whose item types match
    • both schemas are maps whose value types match
    • both schemas are enums whose names match
    • both schemas are fixed whose sizes and names match
    • both schemas are records with the same name
    • either schema is a union
    • both schemas have same primitive type
    • the writer's schema may be promoted to the reader's as follows:
      • int is promotable to long, float, or double
      • long is promotable to float or double
      • float is promotable to double
  • if both are records:
    • the ordering of fields may be different: fields are matched by name.
    • schemas for fields with the same name in both records are resolved recursively.
    • if the writer's record contains a field with a name not present in the reader's record, the writer's value for that field is ignored.
    • if the reader's record schema has a field that contains a default value, and writer's schema does not have a field with the same name, then the reader should use the default value from its field.
    • if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled.
  • if both are enums: if the writer's symbol is not present in the reader's enum, then an error is signalled.
  • if both are arrays: This resolution algorithm is applied recursively to the reader's and writer's array item schemas.
  • if both are maps: This resolution algorithm is applied recursively to the reader's and writer's value schemas.
  • if both are unions: The first schema in the reader's union that matches the selected writer's union schema is recursively resolved against it. if none match, an error is signalled.
  • if reader's is a union, but writer's is not The first schema in the reader's union that matches the writer's schema is recursively resolved against it. If none match, an error is signalled.
  • if writer's is a union, but reader's is not If the reader's schema matches the selected writer's schema, it is recursively resolved against it. If they do not match, an error is signalled.
In next post we will see a program to serialization/deserialization some data using avro and also see how Avro handles schema evolution.