Tuesday, May 26, 2015
Loading Avro in Scala Spark ( Creating Avro RDD)
Quite frequently when working in Spark we need to deal with Avro format. While there are certain default steps, I will share my experience and tips that might help you make that process less painful.Avro data in HDFS resides in binary Avro format. People use "GenericData.Record" when loading such data.
I will show you a tip that makes this more readable, in case you want to stick with OO approach.
Suppose that you have a simple schema in a fiile "myschema.avsc" defined as follows;
{
"type" : "record",
"name" : "myrecord",
"fields" : [ {
"name" : "RecordID",
"type" : [ "null", "int" ]
},
{
"name" : "RecordName",
"type" : [ "null", "string" ]
},
{
"name" : "RecordSeq",
"type" : [ "null", "long" ]
}, {
"name" : "RecordData",
"type" : [ "null", {
"type" : "array",
"items" : "float"
} ]
} ]
}
This is a record object that has id as Integer , name as String , RecordSeq as Long and a float array containing some numerical data.
Use "avro-tools" library. if you don't have it,you can get it from latest Avro release. See following for reference:
http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line
1) Generating Java file from schema :
type following command:
avro-tools compile schema myschema.avsc .
"avro-tools" takes as an input schema file and location of output result ( in this case `pwd` or ".")
The above command autogenerates the "myrecord.java" file as follows:
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class myrecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"RecordID\",\"type\":[\"null\",\"int\"]},{\"name\":\"RecordName\",\"type\":[\"null\",\"string\"]},{\"name\":\"RecordSeq\",\"type\":[\"null\",\"long\"]},{\"name\":\"RecordData\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.lang.Integer RecordID;
@Deprecated public java.lang.CharSequence RecordName;
@Deprecated public java.lang.Long RecordSeq;
@Deprecated public java.util.List<java.lang.Float> RecordData;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public myrecord() {}
/**
* All-args constructor.
*/
public myrecord(java.lang.Integer RecordID, java.lang.CharSequence RecordName, java.lang.Long RecordSeq, java.util.List<java.lang.Float> RecordData) {
this.RecordID = RecordID;
this.RecordName = RecordName;
this.RecordSeq = RecordSeq;
this.RecordData = RecordData;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
....
/** Gets the value of the 'RecordData' field */
public java.util.List<java.lang.Float> getRecordData() {
return RecordData;
}
/** Sets the value of the 'RecordData' field */
public myrecord.Builder setRecordData(java.util.List<java.lang.Float> value) {
validate(fields()[3], value);
this.RecordData = value;
fieldSetFlags()[3] = true;
return this;
}
...
You can compile the generated "myrecord.java" file in your IDE ( avro libraries need to be included)
2) Using generated file in your Spark Scala code:
You are now ready to load your binary Avro data and reference it by using the class above:
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroJob
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
..
def main(args: Array[String]) {
..
val sc = new SparkContext(..)
//Note: Reference to the above schema needs to be in your classpath or part of the jar so that it gets distributed to all nodes
val recSchema=new Parser().parse(this.getClass.getClassLoader.getResourceAsStream("myschema.avsc"))
val myrecordDir = args(0) //location of your Avro data
//Create RDD
val myAvroJob = new Job()
FileInputFormat.setInputPaths(myAvroJob, myrecordDir)
AvroJob.setInputKeySchema(myAvroJob, recSchema)
AvroJob.setOutputKeySchema(myAvroJob,recSchema)
val records = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
classOf[AvroKeyInputFormat[ myrecord]],
classOf[AvroKey[myrecord]],
classOf[NullWritable])
...
"records" is an Pair RDD with the following signature:
val myAvroJob: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[myrecord],
org.apache.hadoop.io.NullWritable)] which is a Tuple of AvroKey and NullWritable
3) Performing transformations on the RDD as created above
val myRecordOnlyRdd= records.map(x => (x._1.datum)) //Transforms the PairRDD to RDD of "myrecord" i.e : org.apache.spark.rdd.RDD[myrecord]
.filter(_.getVectorData() != null) //filters using getter method
Subscribe to:
Post Comments (Atom)
It was really a nice post and i was really impressed by reading this Big Data Hadoop Online Training Bangalore
ReplyDeleteMmorpg oyunları
ReplyDeleteİNSTAGRAM TAKİPÇİ SATİN AL
Tiktok jeton hilesi
tiktok jeton hilesi
antalya saç ekimi
INSTAGRAM TAKİPÇİ SATİN AL
İnstagram Takipçi Satın Al
Mt2 pvp serverlar
instagram takipçi satın al
perde modelleri
ReplyDeletesms onay
mobil ödeme bozdurma
nft nasıl alınır
ANKARA EVDEN EVE NAKLİYAT
Trafik sigortasi
Dedektor
https://kurma.website
Aşk Kitapları
Smm panel
ReplyDeletesmm panel
İs ilanlari blog
İNSTAGRAM TAKİPÇİ SATIN AL
Hırdavatçı burada
beyazesyateknikservisi.com.tr
Servis
tiktok jeton hilesi
uc satın al
ReplyDeleteen son çıkan perde modelleri
nft nasıl alınır
yurtdışı kargo
özel ambulans
lisans satın al
minecraft premium
en son çıkan perde modelleri
dijital kartvizit
ReplyDeletereferans kimliği nedir
binance referans kodu
referans kimliği nedir
bitcoin nasıl alınır
resimli magnet
O57X
hatay
ReplyDeletekars
mardin
samsun
urfa
NP4FO6
çorlu
ReplyDeleteniğde
urfa
aksaray
hatay
SN87