Tuesday, May 26, 2015
Loading Avro in Scala Spark ( Creating Avro RDD)
Quite frequently when working in Spark we need to deal with Avro format. While there are certain default steps, I will share my experience and tips that might help you make that process less painful.Avro data in HDFS resides in binary Avro format. People use "GenericData.Record" when loading such data.
I will show you a tip that makes this more readable, in case you want to stick with OO approach.
Suppose that you have a simple schema in a fiile "myschema.avsc" defined as follows;
{
"type" : "record",
"name" : "myrecord",
"fields" : [ {
"name" : "RecordID",
"type" : [ "null", "int" ]
},
{
"name" : "RecordName",
"type" : [ "null", "string" ]
},
{
"name" : "RecordSeq",
"type" : [ "null", "long" ]
}, {
"name" : "RecordData",
"type" : [ "null", {
"type" : "array",
"items" : "float"
} ]
} ]
}
This is a record object that has id as Integer , name as String , RecordSeq as Long and a float array containing some numerical data.
Use "avro-tools" library. if you don't have it,you can get it from latest Avro release. See following for reference:
http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line
1) Generating Java file from schema :
type following command:
avro-tools compile schema myschema.avsc .
"avro-tools" takes as an input schema file and location of output result ( in this case `pwd` or ".")
The above command autogenerates the "myrecord.java" file as follows:
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class myrecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"RecordID\",\"type\":[\"null\",\"int\"]},{\"name\":\"RecordName\",\"type\":[\"null\",\"string\"]},{\"name\":\"RecordSeq\",\"type\":[\"null\",\"long\"]},{\"name\":\"RecordData\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.lang.Integer RecordID;
@Deprecated public java.lang.CharSequence RecordName;
@Deprecated public java.lang.Long RecordSeq;
@Deprecated public java.util.List<java.lang.Float> RecordData;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public myrecord() {}
/**
* All-args constructor.
*/
public myrecord(java.lang.Integer RecordID, java.lang.CharSequence RecordName, java.lang.Long RecordSeq, java.util.List<java.lang.Float> RecordData) {
this.RecordID = RecordID;
this.RecordName = RecordName;
this.RecordSeq = RecordSeq;
this.RecordData = RecordData;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
....
/** Gets the value of the 'RecordData' field */
public java.util.List<java.lang.Float> getRecordData() {
return RecordData;
}
/** Sets the value of the 'RecordData' field */
public myrecord.Builder setRecordData(java.util.List<java.lang.Float> value) {
validate(fields()[3], value);
this.RecordData = value;
fieldSetFlags()[3] = true;
return this;
}
...
You can compile the generated "myrecord.java" file in your IDE ( avro libraries need to be included)
2) Using generated file in your Spark Scala code:
You are now ready to load your binary Avro data and reference it by using the class above:
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroJob
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
..
def main(args: Array[String]) {
..
val sc = new SparkContext(..)
//Note: Reference to the above schema needs to be in your classpath or part of the jar so that it gets distributed to all nodes
val recSchema=new Parser().parse(this.getClass.getClassLoader.getResourceAsStream("myschema.avsc"))
val myrecordDir = args(0) //location of your Avro data
//Create RDD
val myAvroJob = new Job()
FileInputFormat.setInputPaths(myAvroJob, myrecordDir)
AvroJob.setInputKeySchema(myAvroJob, recSchema)
AvroJob.setOutputKeySchema(myAvroJob,recSchema)
val records = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
classOf[AvroKeyInputFormat[ myrecord]],
classOf[AvroKey[myrecord]],
classOf[NullWritable])
...
"records" is an Pair RDD with the following signature:
val myAvroJob: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[myrecord],
org.apache.hadoop.io.NullWritable)] which is a Tuple of AvroKey and NullWritable
3) Performing transformations on the RDD as created above
val myRecordOnlyRdd= records.map(x => (x._1.datum)) //Transforms the PairRDD to RDD of "myrecord" i.e : org.apache.spark.rdd.RDD[myrecord]
.filter(_.getVectorData() != null) //filters using getter method
Subscribe to:
Posts (Atom)