Saturday, May 24, 2014

Avro in Java and .NET

I am playing with Apache Avro which is now actively supported by Microsoft

As part of this I created a Java and a .NET application. They demonstrate the below concepts:

- Using Schema and GenericRecord (Both in .NET and Java)
- Comparing the output sizes of various Codecs (Java sample)
- Serialize in Java and consume in .NET and vice-versa.

Here is the Java Code followed by the .NET Code. Look at the main methods to begin. No warranties - Works on my laptop ;)

Note for .NET folks: it is easier if you first setup the .NET code sample available here and just add the class below to the project.

For Java, the pom is below as well.

=================== Java class ================

package com.avro.example;


import java.util.List;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;

import org.apache.avro.Schema;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

/*

 *  Serialize/Deserialize interop with .NET
 */
public class AvroInteropTest {
    final static String schemaDescription = "        {\r\n" + 
    "            \"type\":\"record\",\r\n" + 
    "            \"name\":\"SensorData\",\r\n" + 
    "            \"fields\":\r\n" + 
    "                [\r\n" + 
    "                    { \r\n" + 
    "                        \"name\":\"Location\", \r\n" + 
    "                        \"type\":\r\n" + 
    "                            {\r\n" + 
    "                                \"type\":\"record\",\r\n" + 
    "                                \"name\":\"Location\",\r\n" + 
    "                                \"fields\":\r\n" + 
    "                                    [\r\n" + 
    "                                        { \"name\":\"Floor\", \"type\":\"int\" },\r\n" + 
    "                                        { \"name\":\"Room\", \"type\":\"int\" }\r\n" + 
    "                                    ]\r\n" + 
    "                            }\r\n" + 
    "                    },\r\n" + 
    "                    { \"name\":\"Value\", \"type\":\"string\" }\r\n" + 
    "                ]\r\n" + 
    "        }\r\n" + 
    "";


public static void main(String[] args) throws IOException {
// demo of reading data serialized by a .NET application (.NET->Java)
// run the CSharp program to generate the file before running this
deserializeFromFile("c:\\temp\\FromCSharpWithLove.bin");



// file for CSharp to deserialize for the Java->.NET demo
// .NET currently supports deflateCodec
serializeToFile("c:\\temp\\FromJavaWithLove.bin", CodecFactory.deflateCodec(9));



// demo of various codecs
// observe the file sizes
demoCodecs();
    }

static void demoCodecs() throws IOException{

serializeToFile("c:\\temp\\AvroDeflateCodec.bin", CodecFactory.deflateCodec(9));
serializeToFile("c:\\temp\\AvroBZip2Codec.bin", CodecFactory.bzip2Codec());
serializeToFile("c:\\temp\\AvroSnappyCodec.bin", CodecFactory.snappyCodec());
serializeToFile("c:\\temp\\AvroXZCodec.bin", CodecFactory.xzCodec(9));
}

static void serializeToFile(String fileName) throws IOException{

serializeToFile(fileName, CodecFactory.deflateCodec(9));
}

static void serializeToFile(String fileName, CodecFactory factory) throws IOException{
File file = new File(fileName);
        Schema schema = new Schema.Parser().parse(schemaDescription);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.setCodec(factory);
        dataFileWriter.create(schema, file);
        
        // Populate data
        List<GenericRecord> records = createSensorDataRecords(schema, 1000);
        for(GenericRecord record : records){
        dataFileWriter.append(record);
        }
        dataFileWriter.close();
                
System.out.println(String.format("==== Serialized to file %s using %s=====", fileName, factory.toString()));
System.out.println("");
}

static void deserializeFromFile(String fileName) throws IOException{

        Schema schema = new Schema.Parser().parse(schemaDescription);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(fileName), reader);
GenericRecord result = null;

System.out.println(String.format(

"==== Deserializing from file %s =====", fileName));

while (dataFileReader.hasNext()) {

result = dataFileReader.next(result);
System.out.println(result.toString());
}
}

static GenericRecord createSensorDataRecord(int floor, int room, String value, Schema schema){

        GenericRecord location = new GenericData.Record(schema.getField("Location").schema());
        location.put("Floor", floor);
        location.put("Room", room);

        GenericRecord sensorData = new GenericData.Record(schema);

        sensorData.put("Location", location);
        sensorData.put("Value", value);

        return sensorData;

}

static List<GenericRecord> createSensorDataRecords(Schema schema, int count){

ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();

for (int i=0; i < count; i++){
records.add(createSensorDataRecord(i, i*10, String.format("Java-Sensor-Value-%s", new Date().toString()), schema));
}
return records;
}

}




================ pom file ===========
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>avro.java</groupId>
<artifactId>avro-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>avro-example-project</name>


<dependencies>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
</dependencies>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.6</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</project>




============= .NET  (See note above on setting up the project =============
namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro.Schema;

    public class AvroUsingGenericRecordJavaInterop
    {
        //Define the schema in JSON
        const string Schema = @"{
                            ""type"":""record"",
                            ""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
                            ""fields"":
                                [
                                    { 
                                        ""name"":""Location"", 
                                        ""type"":
                                            {
                                                ""type"":""record"",
                                                ""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
                                                ""fields"":
                                                    [
                                                        { ""name"":""Floor"", ""type"":""int"" },
                                                        { ""name"":""Room"", ""type"":""int"" }
                                                    ]
                                            }
                                    },
                                    { ""name"":""Value"", ""type"":""string"" }
                                ]
                        }";



        //Serialize and deserialize sample data set using Generic Record.
        //Generic Record is a special class with the schema explicitly defined in JSON.
        //All serialized data should be mapped to the fields of Generic Record,
        //which in turn will be then serialized.
        public void WriteObjectToFileUsingGenericRecords(String fileName)
        {
            //Create a generic serializer based on the schema
            var serializer = AvroSerializer.CreateGeneric(Schema);
            var rootSchema = serializer.WriterSchema as RecordSchema;

            Console.WriteLine("========= Serializing Sample Data Set ========");

            List<AvroRecord> data = CreateRecords(10, serializer, rootSchema);

            using (Stream st = new FileStream(fileName,FileMode.OpenOrCreate,FileAccess.Write,FileShare.Write))
            {
                using (var w = AvroContainer.CreateGenericWriter(Schema, st, Codec.Deflate))
                {
                    using (var writer = new SequentialWriter<object>(w, 24))
                    {
                        // Serialize the data to stream using the sequential writer
                        data.ForEach(writer.Write);
                    }
                }
            }
            Console.WriteLine("");
        }

        static List<AvroRecord> CreateRecords(int count, IAvroSerializer<object> serializer, RecordSchema rootSchema)
        {
            List<AvroRecord> list = new List<AvroRecord>();

            for (int i = 0; i < count; i++)
            {
                dynamic sensorData = CreateRecord(  i, 
                                                    i * 10, 
                                                    string.Format("C#-Sensor-Value-{0}", DateTime.Now.ToLongTimeString()), 
                                                    serializer,
                                                    rootSchema);
                list.Add(sensorData);
            }

            return list;
        }

        void ReadFromAvroFile(String fileName)
        {
            Console.WriteLine("========= Deserializing Sample Data Set ========");

            using (Stream st = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
            {
                using (var reader = AvroContainer.CreateGenericReader(st))
                {
                    using (var streamReader = new SequentialReader<object>(reader))
                    {
                        var results = streamReader.Objects;

                        foreach (object result in results)
                        {
                            dynamic theResult = (dynamic)result;
                            Console.WriteLine("Location {{Floor ={0}, Room={1}}}, Value={2}", 
                                                theResult.Location.Floor,
                                                theResult.Location.Room,
                                                theResult.Value);
                        }
                    }
                }
            }
        
        }

        static void Main()
        {
            AvroUsingGenericRecordJavaInterop Sample = new AvroUsingGenericRecordJavaInterop();

            //Serialization to file using Generic Record
            //Note the java application will read this and display the values
            Sample.WriteObjectToFileUsingGenericRecords(@"c:\temp\FromCSharpWithLove.bin");



            //this file came from java application, which uses the same schema to serialize the data
            //using the avro libraries
            Sample.ReadFromAvroFile(@"c:\temp\FromJavaWithLove.bin");

            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }

        static dynamic CreateRecord(int floor, int room, String value, IAvroSerializer<object> serializer, RecordSchema rootSchema)
        {
            //Create a generic record to represent the data
            dynamic location = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
            location.Floor = floor;
            location.Room = room;

            dynamic sensorData = new AvroRecord(serializer.WriterSchema);
            sensorData.Location = location;
            sensorData.Value = value;

            return sensorData;
        }

    }
}

Monday, February 17, 2014

Coursera: Algorithms Design And Analysis

If you are a programmer, you should refresh your data structure theory once a year (or twice). Coursera has a few free courses. I took a couple:

Algorithms: Design and Analysis, Part 1
by: Tim Roughgarden Stanford University

Algorithms, Part I
by Robert Sedgewick and Kevin Wayne Princeton University

As you progress, the theory gets overwhelming, but its worth the pain. Just do it!

Disclaimer: I didn't complete either one of the courses. But I loved them!

Java vs C# - Anders Hejlsberg interview (10 year old, but still relevant)

I keep re-reading this interview once in a while. It boils down the essence of how Anders designed C# to avoid the problems with Java. Although this was written in 2003, it is still very relevant. 

A Conversation with Anders Hejlsberg

If you have not read this before, please read all of them or at least these two:

Part II: The Trouble with Checked Exceptions

Part VII: Generics in C#, Java, and C++

Sunday, February 16, 2014

Performance Optimization and High throughput computing

If you are interested in high throughput computing, checkout these infoq sessions. All these guys are Gurus in high frequency trading. Lots of cool and practical stuff; some of it against conventional wisdom.

Martin Thompson

Todd Montgomery

Pieter Hintjens

Saturday, February 15, 2014

Wisdom of Earth

The amount of wisdom out there in the world is amazing. There are 7 billion people on Earth today. I am sure each one of us must have a little wisdom to give others. Add all the people who have passed through, so there is endless wisdom out there. Here is some I came across in the last two days: 

How being hard-headed (or as they say, knuckle headed) can make you irrelevant:
The North going Zax and South going Zax

Sun Tzu's wisdom (one of them):
"For to win one hundred victories in one hundred battles is not the acme of skill. To subdue the enemy without fighting is the acme of skill"