Wednesday, July 1, 2015

Working with Spark and Avro




There has been lots of coverage on Spark in the big data community .Spark is rapidly emerging as the framework of choice for big data and memory intensive computation.
Spark is a general-purpose cluster computing framework. If you’ve got so much data that you need a cluster of machines to process it – then Spark provides an execution engine, programming model and set of libraries that can help you do cool and powerful things with that data.
The main reason for it’s existence is to supersede MapReduce – the original batch processing engine at the heart of Hadoop – and to support the types of use cases MapReduce traditionally struggled with. That is, anything requiring low latency. Spark adds the ability to work with the data in your cluster interactively – something you could never do with MapReduce. It accomplishes this by using the pool of memory in your cluster to cache data that requires fast access.
Spark internals are the differentiating factor. Spark uses in-memory, fault-tolerant resilient distributed datasets (RDDs), keeping intermediates, inputs, and outputs in memory instead of on disk. These two elements of functionality can result in better performance for certain workloads when compared to Hadoop MapReduce, which will force jobs into a sequential map-reduce framework and incurs an I/O cost from writing intermediates out to disk. Spark’s performance enhancements are particularly applicable for iterative workloads, which are common in machine learning and low-latency querying use cases. Spark is promising to speed up application development by 10-100x, make applications more portable ,extensible, and make the actual application run 100x faster
In this post I will describe how to handle a specific format (Avro) when using Spark.
The ecosystem around Apache Hadoop has grown at a tremendous rate. Frequently there is interplay among different components in a typical big data system. Data collected by Flume might be analyzed by Pig and Hive scripts. Data imported with Sqoop might be processed by a MapReduce program. To facilitate these and other scenarios, data produced by each component must be readily consumed by other components.
This has posed a challenge  referred to as  Data Interoperability.
Most organizations have many silos of data in their data warehouses ranging from textual, time series, pdf files, JSON, XML alongside many other custom formats . By far the predominant format is text.
Text formats are not compact enough, and can be painful to maintain over time. A basic binary format may be more compact, but it has the same maintenance issues as text. Furthermore, we needed rich data types including lists and nested records.
Therefore there has been an increasing need to enable each system to read and write a common format. Some systems might use other formats internally for performance, but whenever data is meant to be accessible to other systems a common format is used.
Avro is created to address that challenge and facilitate  make data exchange across different silos. Many organizations have adopted the Avro format. Avro is a remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocol and serializes data in a compact binary format. Avro schema has binary data which allows each datum to be written without overhead.


Spark internals overview

RDD is the main building block of Spark. RDD is distributed dataset which is divided into partitions across nodes. Each of these partitions can be present in the memory or disk of different machines. If you want Spark to process the RDD, then Spark needs to launch one task per partition of the RDD.
Therefore it is helpful to represent our data in efficient format. Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protocol buffer


Below is an illustration of a typical step of reading a file in Spark:







When using “textFile” by default Spark creates an RDD of Strings. Applying transformation such as “map” results in RDD of List[Strings]. If we represent that in String/Text format the memory imprint can be significant. Therefore it pays off to have our data in more compressed format, hence the rational of using Avro.


Creating Avro schema


Suppose that you have simple schemas in files "Order.avsc" and Customer.avsc respectively   defined as follows:


Example 1.
{
  "type" : "record",
  "name" : "Order",
  "fields" : [ {
    "name" : "OrderID",
    "type" : [ "null", "int" ]
  },
  {
    "name" : "Amount",
    "type" : [ "null", "float" ]
  },
  {
    "name" : "CustomerId",
    "type" : [ "null", "long" ]
  }]
  } ]
}
  }]
  } ]
}
 


Example 2
{
  "type" : "record",
  "name" : "Customer",
  "fields" : [ {
    "name" : "CutomerID",
    "type" : [ "null", "int" ]
  },
  {
    "name" : "CustomerName",
    "type" : [ "null", "string" ]
  },
  {
    "name" : "Address",
    "type" : [ "null", "string" ]
  }]
  } ]
}





 
There is a good library – “avro-tools” which comes in handy when dealing with Avro schema. It can be downloaded from https://avro.apache.org/docs/1.7.7/gettingstartedjava.html
A good post explaining this more in depth can be found here:
A straightforward way to verify your Avro schema is to execute the following:
avro-tools compile schema <schema file>  <name of desired java package>
i.e
avro-tools compile schema Customer.avsc customer
avro-tools compile schema Order.avsc order
Above command auto generates two java files: Customer.java under customer folder and Order.java under order folder
Even if one is not working with Java above is a good way to verify your schema.I will not describe the mechanism of populating Avro files since it is very well described in the official Avro documentation page as mentioned above .Basically, once you have valid schema, you can use it to create objects and serialize them to a data file on disk.Suppose that we have populated large data for Customer and Order.


Creating Avro based RDD


In order to create RDD which consists of Avro data we need to do the following:
Example 3
 // This is standard Hadoop API’s org.apache.hadoop.mapreduce.Job
val configCustomer = new Job() 
//Place where Avro customer dataset is stored
FileInputFormat.setInputPaths(config,<HDFS INPUT DIR>)
 //Set the input schema for the job 
val customerSchema = new Parser().parse(this.getClass.getClassLoader.getResourceAsStream("Customer.avsc"))   
AvroJob.setInputKeySchema(config, customerSchema) 







Create the  RDD 


Example 4
val customers= sc.newAPIHadoopRDD(configCustomer.getConfiguration,
      classOf[AvroKeyInputFormat[GenericData.Record]],
      classOf[AvroKey[GenericData.Record]],
      classOf[NullWritable])





At this stage we have customer Pair RDD which type [K,V] is: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[GenericData.Record],
 org.apache.hadoop.io.NullWritable)].
GenericData.Record” is the default type of Avro record. In case you need to reference
specific class then you need to use the auto generated class as described above.  


Similarly, create the RDD for Order:




Example 5

val configOrder = new Job() 
val orderSchema = new Parser().parse(this.getClass.getClassLoader.getResourceAsStream("Order.avsc"))  
AvroJob.setInputKeySchema(config, orderSchema)
val orders= sc.newAPIHadoopRDD(configOrder.getConfiguration,
      classOf[AvroKeyInputFormat[GenericData.Record]],
      classOf[AvroKey[GenericData.Record]],
      classOf[NullWritable])





 


Note that we needed to pass second job configuration so that we can define the Order location.At this stage we have two lazily referenced RDD’s  - customers and orders. However their formats  is not really suitable for direct manipulation since we have key,value  InputFormat Hadoop file.
We need to “unmarshall” it by doing the following:


 
Example 6

val customerRDD= customers.map(x => (x._1.datum))
 //RDD[GenericData.Record]  “datum” is method on AvroKey class and it “unwraps” the content which is by default  GenericData.Record.
                                 .map(pair=>(pair.get(0),pair.get(1) // RDD[(Int,String)]
//”First” filed of the schema is  Customer ID and second is “Customer Name”









Similarly for Order:
Example 7
val orderRDD=orders.map(x => (x._1.datum)). //RDD[GenericData.Record]                                  map(pair=>(pair.get(2),pair.get(0) // RDD[(Int,Int)]
//Third  filed of the schema is  Customer ID and first is “order Id”






We end up with two RDD's - customerRDD : org.apache.spark.rdd.RDD[Int,String] and 
orderRDD :org.apache.spark.rdd.RDD[Int,Int].Both have key “CustomerId”, so we can join them


as follows:
 
Example 8
val CustomerOrderRDD= customerRDD.join( orderRDD)





Having created this joined RDD we can do transformations on the second element which is a Tuple of (String , Int) )
As an example we can find all the customers that have spend more than $300 dollars:


Example. 9
val filterCustomers= CustomerOrderRDD.filter(record=>(record._2._2 >300))






Thus we have been able to join two different Avro based datasets in order to manipulate them.
Significant gain of using Avro as described in the above example is that it can be dynamic , platform independent and allows for schema evolution. The latter is very important aspect of today ever changing requirements of data management in the enterprise.