Tuesday, May 26, 2015

Loading Avro in Scala Spark ( Creating Avro RDD)



Quite frequently when working in Spark we need to deal with Avro format. While there are certain default steps,  I will share my experience and tips that might help you make that process less painful.Avro data in HDFS resides in binary Avro format. People use "GenericData.Record" when loading such data.
I will show you a tip that makes this more readable, in case you want to stick with OO approach.
 
Suppose that you have a  simple schema in a fiile "myschema.avsc" defined as follows;

{
  "type" : "record",
  "name" : "myrecord",
  "fields" : [ {
    "name" : "RecordID",
    "type" : [ "null", "int" ]
  },
  {
    "name" : "RecordName",
    "type" : [ "null", "string" ]
  },
  {
    "name" : "RecordSeq",
    "type" : [ "null", "long" ]
  }, {
    "name" : "RecordData",
    "type" : [ "null", {
      "type" : "array",
      "items" : "float"
    } ]
  } ]
}

This is  a record object that has id as Integer , name as String , RecordSeq as Long and a float array containing some numerical data.
Use "avro-tools" library. if you don't have it,you can get it from latest Avro release. See following for reference:
http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line


1)  Generating Java file from schema :
type following command:
avro-tools compile schema myschema.avsc .
"avro-tools" takes as an input schema file and location of output result ( in this case  `pwd` or ".")

The above command autogenerates the "myrecord.java" file as follows:


  * DO NOT EDIT DIRECTLY
 */
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class myrecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"RecordID\",\"type\":[\"null\",\"int\"]},{\"name\":\"RecordName\",\"type\":[\"null\",\"string\"]},{\"name\":\"RecordSeq\",\"type\":[\"null\",\"long\"]},{\"name\":\"RecordData\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public java.lang.Integer RecordID;
  @Deprecated public java.lang.CharSequence RecordName;
  @Deprecated public java.lang.Long RecordSeq;
  @Deprecated public java.util.List<java.lang.Float> RecordData;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public myrecord() {}

  /**
   * All-args constructor.
   */
  public myrecord(java.lang.Integer RecordID, java.lang.CharSequence RecordName, java.lang.Long RecordSeq, java.util.List<java.lang.Float> RecordData) {
    this.RecordID = RecordID;
    this.RecordName = RecordName;
    this.RecordSeq = RecordSeq;
    this.RecordData = RecordData;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {

  ....
/** Gets the value of the 'RecordData' field */
    public java.util.List<java.lang.Float> getRecordData() {
      return RecordData;
    }

    /** Sets the value of the 'RecordData' field */
    public myrecord.Builder setRecordData(java.util.List<java.lang.Float> value) {
      validate(fields()[3], value);
      this.RecordData = value;
      fieldSetFlags()[3] = true;
      return this;
    }
...

You can compile the generated "myrecord.java" file in your IDE ( avro libraries need to be included)

 2) Using generated file in your Spark Scala code:

You are now ready to load your binary Avro data and reference it by using the class above:



import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroJob
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
..
def main(args: Array[String]) {
..
  val sc = new SparkContext(..)

//Note: Reference to the above schema needs to be in your classpath or part of the jar so that it gets distributed to all nodes
val recSchema=new Parser().parse(this.getClass.getClassLoader.getResourceAsStream("myschema.avsc"))

val myrecordDir = args(0)  //location of your Avro data

//Create RDD
 val myAvroJob = new Job()
    FileInputFormat.setInputPaths(myAvroJob,  myrecordDir)
    AvroJob.setInputKeySchema(myAvroJob,  recSchema)
    AvroJob.setOutputKeySchema(myAvroJob,recSchema)
    val records = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
      classOf[AvroKeyInputFormat[ myrecord]],
      classOf[AvroKey[myrecord]],
      classOf[NullWritable])
 ...

"records" is an Pair RDD with the following signature:
val myAvroJob: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[myrecord],
 org.apache.hadoop.io.NullWritable)]  which is a Tuple of AvroKey and NullWritable

3) Performing transformations on the  RDD as created above


    val myRecordOnlyRdd= records.map(x => (x._1.datum)) //Transforms the PairRDD to RDD of "myrecord" i.e : org.apache.spark.rdd.RDD[myrecord]
                                                         .filter(_.getVectorData() != null) //filters using getter method