Saturday, May 24, 2014

Avro in Java and .NET

I am playing with Apache Avro which is now actively supported by Microsoft

As part of this I created a Java and a .NET application. They demonstrate the below concepts:

- Using Schema and GenericRecord (Both in .NET and Java)
- Comparing the output sizes of various Codecs (Java sample)
- Serialize in Java and consume in .NET and vice-versa.

Here is the Java Code followed by the .NET Code. Look at the main methods to begin. No warranties - Works on my laptop ;)

Note for .NET folks: it is easier if you first setup the .NET code sample available here and just add the class below to the project.

For Java, the pom is below as well.

=================== Java class ================

package com.avro.example;


import java.util.List;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;

import org.apache.avro.Schema;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

/*

 *  Serialize/Deserialize interop with .NET
 */
public class AvroInteropTest {
    final static String schemaDescription = "        {\r\n" + 
    "            \"type\":\"record\",\r\n" + 
    "            \"name\":\"SensorData\",\r\n" + 
    "            \"fields\":\r\n" + 
    "                [\r\n" + 
    "                    { \r\n" + 
    "                        \"name\":\"Location\", \r\n" + 
    "                        \"type\":\r\n" + 
    "                            {\r\n" + 
    "                                \"type\":\"record\",\r\n" + 
    "                                \"name\":\"Location\",\r\n" + 
    "                                \"fields\":\r\n" + 
    "                                    [\r\n" + 
    "                                        { \"name\":\"Floor\", \"type\":\"int\" },\r\n" + 
    "                                        { \"name\":\"Room\", \"type\":\"int\" }\r\n" + 
    "                                    ]\r\n" + 
    "                            }\r\n" + 
    "                    },\r\n" + 
    "                    { \"name\":\"Value\", \"type\":\"string\" }\r\n" + 
    "                ]\r\n" + 
    "        }\r\n" + 
    "";


public static void main(String[] args) throws IOException {
// demo of reading data serialized by a .NET application (.NET->Java)
// run the CSharp program to generate the file before running this
deserializeFromFile("c:\\temp\\FromCSharpWithLove.bin");



// file for CSharp to deserialize for the Java->.NET demo
// .NET currently supports deflateCodec
serializeToFile("c:\\temp\\FromJavaWithLove.bin", CodecFactory.deflateCodec(9));



// demo of various codecs
// observe the file sizes
demoCodecs();
    }

static void demoCodecs() throws IOException{

serializeToFile("c:\\temp\\AvroDeflateCodec.bin", CodecFactory.deflateCodec(9));
serializeToFile("c:\\temp\\AvroBZip2Codec.bin", CodecFactory.bzip2Codec());
serializeToFile("c:\\temp\\AvroSnappyCodec.bin", CodecFactory.snappyCodec());
serializeToFile("c:\\temp\\AvroXZCodec.bin", CodecFactory.xzCodec(9));
}

static void serializeToFile(String fileName) throws IOException{

serializeToFile(fileName, CodecFactory.deflateCodec(9));
}

static void serializeToFile(String fileName, CodecFactory factory) throws IOException{
File file = new File(fileName);
        Schema schema = new Schema.Parser().parse(schemaDescription);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.setCodec(factory);
        dataFileWriter.create(schema, file);
        
        // Populate data
        List<GenericRecord> records = createSensorDataRecords(schema, 1000);
        for(GenericRecord record : records){
        dataFileWriter.append(record);
        }
        dataFileWriter.close();
                
System.out.println(String.format("==== Serialized to file %s using %s=====", fileName, factory.toString()));
System.out.println("");
}

static void deserializeFromFile(String fileName) throws IOException{

        Schema schema = new Schema.Parser().parse(schemaDescription);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(fileName), reader);
GenericRecord result = null;

System.out.println(String.format(

"==== Deserializing from file %s =====", fileName));

while (dataFileReader.hasNext()) {

result = dataFileReader.next(result);
System.out.println(result.toString());
}
}

static GenericRecord createSensorDataRecord(int floor, int room, String value, Schema schema){

        GenericRecord location = new GenericData.Record(schema.getField("Location").schema());
        location.put("Floor", floor);
        location.put("Room", room);

        GenericRecord sensorData = new GenericData.Record(schema);

        sensorData.put("Location", location);
        sensorData.put("Value", value);

        return sensorData;

}

static List<GenericRecord> createSensorDataRecords(Schema schema, int count){

ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();

for (int i=0; i < count; i++){
records.add(createSensorDataRecord(i, i*10, String.format("Java-Sensor-Value-%s", new Date().toString()), schema));
}
return records;
}

}




================ pom file ===========
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>avro.java</groupId>
<artifactId>avro-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>avro-example-project</name>


<dependencies>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
</dependencies>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.6</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</project>




============= .NET  (See note above on setting up the project =============
namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro.Schema;

    public class AvroUsingGenericRecordJavaInterop
    {
        //Define the schema in JSON
        const string Schema = @"{
                            ""type"":""record"",
                            ""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
                            ""fields"":
                                [
                                    { 
                                        ""name"":""Location"", 
                                        ""type"":
                                            {
                                                ""type"":""record"",
                                                ""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
                                                ""fields"":
                                                    [
                                                        { ""name"":""Floor"", ""type"":""int"" },
                                                        { ""name"":""Room"", ""type"":""int"" }
                                                    ]
                                            }
                                    },
                                    { ""name"":""Value"", ""type"":""string"" }
                                ]
                        }";



        //Serialize and deserialize sample data set using Generic Record.
        //Generic Record is a special class with the schema explicitly defined in JSON.
        //All serialized data should be mapped to the fields of Generic Record,
        //which in turn will be then serialized.
        public void WriteObjectToFileUsingGenericRecords(String fileName)
        {
            //Create a generic serializer based on the schema
            var serializer = AvroSerializer.CreateGeneric(Schema);
            var rootSchema = serializer.WriterSchema as RecordSchema;

            Console.WriteLine("========= Serializing Sample Data Set ========");

            List<AvroRecord> data = CreateRecords(10, serializer, rootSchema);

            using (Stream st = new FileStream(fileName,FileMode.OpenOrCreate,FileAccess.Write,FileShare.Write))
            {
                using (var w = AvroContainer.CreateGenericWriter(Schema, st, Codec.Deflate))
                {
                    using (var writer = new SequentialWriter<object>(w, 24))
                    {
                        // Serialize the data to stream using the sequential writer
                        data.ForEach(writer.Write);
                    }
                }
            }
            Console.WriteLine("");
        }

        static List<AvroRecord> CreateRecords(int count, IAvroSerializer<object> serializer, RecordSchema rootSchema)
        {
            List<AvroRecord> list = new List<AvroRecord>();

            for (int i = 0; i < count; i++)
            {
                dynamic sensorData = CreateRecord(  i, 
                                                    i * 10, 
                                                    string.Format("C#-Sensor-Value-{0}", DateTime.Now.ToLongTimeString()), 
                                                    serializer,
                                                    rootSchema);
                list.Add(sensorData);
            }

            return list;
        }

        void ReadFromAvroFile(String fileName)
        {
            Console.WriteLine("========= Deserializing Sample Data Set ========");

            using (Stream st = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
            {
                using (var reader = AvroContainer.CreateGenericReader(st))
                {
                    using (var streamReader = new SequentialReader<object>(reader))
                    {
                        var results = streamReader.Objects;

                        foreach (object result in results)
                        {
                            dynamic theResult = (dynamic)result;
                            Console.WriteLine("Location {{Floor ={0}, Room={1}}}, Value={2}", 
                                                theResult.Location.Floor,
                                                theResult.Location.Room,
                                                theResult.Value);
                        }
                    }
                }
            }
        
        }

        static void Main()
        {
            AvroUsingGenericRecordJavaInterop Sample = new AvroUsingGenericRecordJavaInterop();

            //Serialization to file using Generic Record
            //Note the java application will read this and display the values
            Sample.WriteObjectToFileUsingGenericRecords(@"c:\temp\FromCSharpWithLove.bin");



            //this file came from java application, which uses the same schema to serialize the data
            //using the avro libraries
            Sample.ReadFromAvroFile(@"c:\temp\FromJavaWithLove.bin");

            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }

        static dynamic CreateRecord(int floor, int room, String value, IAvroSerializer<object> serializer, RecordSchema rootSchema)
        {
            //Create a generic record to represent the data
            dynamic location = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
            location.Floor = floor;
            location.Room = room;

            dynamic sensorData = new AvroRecord(serializer.WriterSchema);
            sensorData.Location = location;
            sensorData.Value = value;

            return sensorData;
        }

    }
}

No comments:

Post a Comment