There has been lots of coverage on Spark in the big data community .Spark is rapidly emerging as the framework of choice for big data and memory intensive computation.
Spark is a general-purpose cluster computing framework. If you’ve got so much data that you need a cluster of machines to process it – then Spark provides an execution engine, programming model and set of libraries that can help you do cool and powerful things with that data.
The main reason for it’s existence is to supersede MapReduce – the original batch processing engine at the heart of Hadoop – and to support the types of use cases MapReduce traditionally struggled with. That is, anything requiring low latency. Spark adds the ability to work with the data in your cluster interactively – something you could never do with MapReduce. It accomplishes this by using the pool of memory in your cluster to cache data that requires fast access.
Spark internals are the differentiating factor. Spark uses in-memory, fault-tolerant resilient distributed datasets (RDDs), keeping intermediates, inputs, and outputs in memory instead of on disk. These two elements of functionality can result in better performance for certain workloads when compared to Hadoop MapReduce, which will force jobs into a sequential map-reduce framework and incurs an I/O cost from writing intermediates out to disk. Spark’s performance enhancements are particularly applicable for iterative workloads, which are common in machine learning and low-latency querying use cases. Spark is promising to speed up application development by 10-100x, make applications more portable ,extensible, and make the actual application run 100x faster
In this post I will describe how to handle a specific format (Avro) when using Spark.
The ecosystem around Apache Hadoop has grown at a tremendous rate. Frequently there is interplay among different components in a typical big data system. Data collected by Flume might be analyzed by Pig and Hive scripts. Data imported with Sqoop might be processed by a MapReduce program. To facilitate these and other scenarios, data produced by each component must be readily consumed by other components.
This has posed a challenge referred to as Data Interoperability.
Most organizations have many silos of data in their data warehouses ranging from textual, time series, pdf files, JSON, XML alongside many other custom formats . By far the predominant format is text.
Text formats are not compact enough, and can be painful to maintain over time. A basic binary format may be more compact, but it has the same maintenance issues as text. Furthermore, we needed rich data types including lists and nested records.
Therefore there has been an increasing need to enable each system to read and write a common format. Some systems might use other formats internally for performance, but whenever data is meant to be accessible to other systems a common format is used.
Avro is created to address that challenge and facilitate make data exchange across different silos. Many organizations have adopted the Avro format. Avro is a remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocol and serializes data in a compact binary format. Avro schema has binary data which allows each datum to be written without overhead.
Spark internals overview
RDD is the main building block of Spark. RDD is
distributed dataset which is divided into partitions across nodes.
Each of these partitions can be present in the memory or disk of
different machines. If you want Spark to process the RDD, then Spark
needs to launch one task per partition of the RDD.
Therefore it is helpful to represent our data in
efficient format. Whenever you have the power to make the decision
about how data is stored on disk, use an extensible binary format
like Avro, Parquet, Thrift, or Protocol buffer
Below is an illustration of a typical step of
reading a file in Spark:
When using “textFile” by default Spark creates
an RDD of Strings. Applying transformation such as “map”
results in RDD of List[Strings]. If we represent that in
String/Text format the memory imprint can be significant. Therefore
it pays off to have our data in more compressed format, hence the
rational of using Avro.
Creating Avro schema
Suppose that you have simple schemas in files
"Order.avsc" and Customer.avsc respectively
defined as follows:
Example 1.
|
{
"type"
: "record",
"name"
: "Order",
"fields"
: [ {
"name" :
"OrderID",
"type" : [ "null",
"int" ]
},
{
"name" : "Amount",
"type" : [ "null",
"float" ]
},
{
"name" :
"CustomerId",
"type" : [ "null",
"long" ]
}]
}
]
}
}]
}
]
}
|
Example 2
|
{
"type"
: "record",
"name"
: "Customer",
"fields"
: [ {
"name" :
"CutomerID",
"type" : [ "null",
"int" ]
},
{
"name" :
"CustomerName",
"type" : [ "null",
"string" ]
},
{
"name" :
"Address",
"type" : [ "null",
"string" ]
}]
}
]
}
|
There is a good library – “avro-tools”
which comes in handy when dealing with Avro schema. It can be
downloaded from
https://avro.apache.org/docs/1.7.7/gettingstartedjava.html
A good post explaining this more in depth can be
found here:
A straightforward way to verify your Avro schema
is to execute the following:
avro-tools compile schema <schema file>
<name of desired java package>
i.e
avro-tools compile schema Customer.avsc
customer
avro-tools compile schema Order.avsc order
Above command auto generates two java files:
Customer.java under customer folder and
Order.java under order folder
Even if one is not working with Java above is a
good way to verify your schema.I will not describe the mechanism of
populating Avro files since it is very well described in the official
Avro documentation page as mentioned above .Basically, once you have
valid schema, you can use it to create objects and serialize them to
a data file on disk.Suppose that we have populated large data for
Customer and Order.
Creating Avro based RDD
In order to create RDD which consists of Avro data
we need to do the following:
Example 3
|
// This
is standard Hadoop API’s org.apache.hadoop.mapreduce.Job
val
configCustomer = new Job()
//Place where
Avro customer dataset is stored
FileInputFormat.setInputPaths(config,<HDFS
INPUT DIR>)
//Set the
input schema for the job
val
customerSchema = new
Parser().parse(this.getClass.getClassLoader.getResourceAsStream("Customer.avsc"))
AvroJob.setInputKeySchema(config,
customerSchema)
|
Create the RDD
Example 4
|
val
customers= sc.newAPIHadoopRDD(configCustomer.getConfiguration,
classOf[AvroKeyInputFormat[GenericData.Record]],
classOf[AvroKey[GenericData.Record]],
classOf[NullWritable])
|
At this stage we have customer Pair RDD which type
[K,V] is:
org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[GenericData.Record],
org.apache.hadoop.io.NullWritable)].
“GenericData.Record” is the default type of
Avro record. In case you need to reference
specific class then you need to use the auto
generated class as described above.
Similarly, create the RDD for Order:
Example 5
|
val
configOrder = new Job()
val
orderSchema = new
Parser().parse(this.getClass.getClassLoader.getResourceAsStream("Order.avsc"))
AvroJob.setInputKeySchema(config,
orderSchema)
val orders=
sc.newAPIHadoopRDD(configOrder.getConfiguration,
classOf[AvroKeyInputFormat[GenericData.Record]],
classOf[AvroKey[GenericData.Record]],
classOf[NullWritable])
|
Note that we needed to pass second job
configuration so that we can define the Order location.At this
stage we have two lazily referenced RDD’s - customers and
orders. However their formats is not really suitable for
direct manipulation since we have key,value InputFormat Hadoop
file.
We need to “unmarshall” it by doing the
following:
Example 6
|
val customerRDD=
customers.map(x => (x._1.datum))
//RDD[GenericData.Record]
“datum” is method on AvroKey class and it “unwraps” the
content which is by default GenericData.Record.
.map(pair=>(pair.get(0),pair.get(1) //
RDD[(Int,String)]
//”First”
filed of the schema is Customer ID and second is “Customer
Name”
|
Similarly for Order:
Example 7
|
val
orderRDD=orders.map(x => (x._1.datum)).
//RDD[GenericData.Record]
map(pair=>(pair.get(2),pair.get(0) //
RDD[(Int,Int)]
//Third filed of the schema is Customer
ID and first is “order Id”
|
We end up with two RDD's - customerRDD :
org.apache.spark.rdd.RDD[Int,String] and
orderRDD :org.apache.spark.rdd.RDD[Int,Int].Both
have key “CustomerId”, so we can join them
as follows:
Example 8
|
val
CustomerOrderRDD= customerRDD.join( orderRDD)
|
Having created this joined RDD we can do
transformations on the second element which is a Tuple of (String ,
Int) )
As an example we can find all the customers that
have spend more than $300 dollars:
Example. 9
|
val
filterCustomers= CustomerOrderRDD.filter(record=>(record._2._2
>300))
|
Thus we have been able to join two different Avro
based datasets in order to manipulate them.
Significant gain of using Avro as described in the
above example is that it can be dynamic , platform independent and
allows for schema evolution. The latter is very important aspect of
today ever changing requirements of data management in the
enterprise.