I am playing with Apache Avro which is now actively supported by Microsoft
As part of this I created a Java and a .NET application. They demonstrate the below concepts:
- Using Schema and GenericRecord (Both in .NET and Java)
- Comparing the output sizes of various Codecs (Java sample)
- Serialize in Java and consume in .NET and vice-versa.
Here is the Java Code followed by the .NET Code. Look at the main methods to begin. No warranties - Works on my laptop ;)
Note for .NET folks: it is easier if you first setup the .NET code sample available here and just add the class below to the project.
For Java, the pom is below as well.
=================== Java class ================
package com.avro.example;
import java.util.List;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
/*
* Serialize/Deserialize interop with .NET
*/
public class AvroInteropTest {
final static String schemaDescription = " {\r\n" +
" \"type\":\"record\",\r\n" +
" \"name\":\"SensorData\",\r\n" +
" \"fields\":\r\n" +
" [\r\n" +
" { \r\n" +
" \"name\":\"Location\", \r\n" +
" \"type\":\r\n" +
" {\r\n" +
" \"type\":\"record\",\r\n" +
" \"name\":\"Location\",\r\n" +
" \"fields\":\r\n" +
" [\r\n" +
" { \"name\":\"Floor\", \"type\":\"int\" },\r\n" +
" { \"name\":\"Room\", \"type\":\"int\" }\r\n" +
" ]\r\n" +
" }\r\n" +
" },\r\n" +
" { \"name\":\"Value\", \"type\":\"string\" }\r\n" +
" ]\r\n" +
" }\r\n" +
"";
public static void main(String[] args) throws IOException {
// demo of reading data serialized by a .NET application (.NET->Java)
// run the CSharp program to generate the file before running this
deserializeFromFile("c:\\temp\\FromCSharpWithLove.bin");
// file for CSharp to deserialize for the Java->.NET demo
// .NET currently supports deflateCodec
serializeToFile("c:\\temp\\FromJavaWithLove.bin", CodecFactory.deflateCodec(9));
// demo of various codecs
// observe the file sizes
demoCodecs();
}
static void demoCodecs() throws IOException{
serializeToFile("c:\\temp\\AvroDeflateCodec.bin", CodecFactory.deflateCodec(9));
serializeToFile("c:\\temp\\AvroBZip2Codec.bin", CodecFactory.bzip2Codec());
serializeToFile("c:\\temp\\AvroSnappyCodec.bin", CodecFactory.snappyCodec());
serializeToFile("c:\\temp\\AvroXZCodec.bin", CodecFactory.xzCodec(9));
}
static void serializeToFile(String fileName) throws IOException{
serializeToFile(fileName, CodecFactory.deflateCodec(9));
}
static void serializeToFile(String fileName, CodecFactory factory) throws IOException{
File file = new File(fileName);
Schema schema = new Schema.Parser().parse(schemaDescription);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.setCodec(factory);
dataFileWriter.create(schema, file);
// Populate data
List<GenericRecord> records = createSensorDataRecords(schema, 1000);
for(GenericRecord record : records){
dataFileWriter.append(record);
}
dataFileWriter.close();
System.out.println(String.format("==== Serialized to file %s using %s=====", fileName, factory.toString()));
System.out.println("");
}
static void deserializeFromFile(String fileName) throws IOException{
Schema schema = new Schema.Parser().parse(schemaDescription);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(fileName), reader);
GenericRecord result = null;
System.out.println(String.format(
"==== Deserializing from file %s =====", fileName));
while (dataFileReader.hasNext()) {
result = dataFileReader.next(result);
System.out.println(result.toString());
}
}
static GenericRecord createSensorDataRecord(int floor, int room, String value, Schema schema){
GenericRecord location = new GenericData.Record(schema.getField("Location").schema());
location.put("Floor", floor);
location.put("Room", room);
GenericRecord sensorData = new GenericData.Record(schema);
sensorData.put("Location", location);
sensorData.put("Value", value);
return sensorData;
}
static List<GenericRecord> createSensorDataRecords(Schema schema, int count){
ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
for (int i=0; i < count; i++){
records.add(createSensorDataRecord(i, i*10, String.format("Java-Sensor-Value-%s", new Date().toString()), schema));
}
return records;
}
}
================ pom file ===========
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>avro.java</groupId>
<artifactId>avro-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>avro-example-project</name>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
</dependencies>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.6</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</project>
============= .NET (See note above on setting up the project =============
namespace Microsoft.Hadoop.Avro.Sample
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using Microsoft.Hadoop.Avro.Container;
using Microsoft.Hadoop.Avro.Schema;
public class AvroUsingGenericRecordJavaInterop
{
//Define the schema in JSON
const string Schema = @"{
""type"":""record"",
""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
""fields"":
[
{
""name"":""Location"",
""type"":
{
""type"":""record"",
""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
""fields"":
[
{ ""name"":""Floor"", ""type"":""int"" },
{ ""name"":""Room"", ""type"":""int"" }
]
}
},
{ ""name"":""Value"", ""type"":""string"" }
]
}";
//Serialize and deserialize sample data set using Generic Record.
//Generic Record is a special class with the schema explicitly defined in JSON.
//All serialized data should be mapped to the fields of Generic Record,
//which in turn will be then serialized.
public void WriteObjectToFileUsingGenericRecords(String fileName)
{
//Create a generic serializer based on the schema
var serializer = AvroSerializer.CreateGeneric(Schema);
var rootSchema = serializer.WriterSchema as RecordSchema;
Console.WriteLine("========= Serializing Sample Data Set ========");
List<AvroRecord> data = CreateRecords(10, serializer, rootSchema);
using (Stream st = new FileStream(fileName,FileMode.OpenOrCreate,FileAccess.Write,FileShare.Write))
{
using (var w = AvroContainer.CreateGenericWriter(Schema, st, Codec.Deflate))
{
using (var writer = new SequentialWriter<object>(w, 24))
{
// Serialize the data to stream using the sequential writer
data.ForEach(writer.Write);
}
}
}
Console.WriteLine("");
}
static List<AvroRecord> CreateRecords(int count, IAvroSerializer<object> serializer, RecordSchema rootSchema)
{
List<AvroRecord> list = new List<AvroRecord>();
for (int i = 0; i < count; i++)
{
dynamic sensorData = CreateRecord( i,
i * 10,
string.Format("C#-Sensor-Value-{0}", DateTime.Now.ToLongTimeString()),
serializer,
rootSchema);
list.Add(sensorData);
}
return list;
}
void ReadFromAvroFile(String fileName)
{
Console.WriteLine("========= Deserializing Sample Data Set ========");
using (Stream st = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateGenericReader(st))
{
using (var streamReader = new SequentialReader<object>(reader))
{
var results = streamReader.Objects;
foreach (object result in results)
{
dynamic theResult = (dynamic)result;
Console.WriteLine("Location {{Floor ={0}, Room={1}}}, Value={2}",
theResult.Location.Floor,
theResult.Location.Room,
theResult.Value);
}
}
}
}
}
static void Main()
{
AvroUsingGenericRecordJavaInterop Sample = new AvroUsingGenericRecordJavaInterop();
//Serialization to file using Generic Record
//Note the java application will read this and display the values
Sample.WriteObjectToFileUsingGenericRecords(@"c:\temp\FromCSharpWithLove.bin");
//this file came from java application, which uses the same schema to serialize the data
//using the avro libraries
Sample.ReadFromAvroFile(@"c:\temp\FromJavaWithLove.bin");
Console.WriteLine("Press any key to exit.");
Console.Read();
}
static dynamic CreateRecord(int floor, int room, String value, IAvroSerializer<object> serializer, RecordSchema rootSchema)
{
//Create a generic record to represent the data
dynamic location = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
location.Floor = floor;
location.Room = room;
dynamic sensorData = new AvroRecord(serializer.WriterSchema);
sensorData.Location = location;
sensorData.Value = value;
return sensorData;
}
}
}
No comments:
Post a Comment