Thursday, September 24, 2015

How to work with Avro data using Apache Spark(Spark SQL API)

We all know how cool Spark is when it comes to fast, general-purpose cluster computing. Apart from the core APIs Spark also provides a rich set of higher-level Apis which include Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for high-throughput, fault-tolerant stream processing of live data streams.

Through this post we'll explore the Spark SQL API and see how to use it with Avro data. As stated earlier, Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. A DataFrame is a distributed collection of data organised into named columns. We can think of a DataFrame as a table in a relational database or as a dataframe in R or Python.

Apache Avro is a very popular data serialization system, specially in BigData world. We'll use the spark-avro library for this. spark-avro is a beautiful library provided by Databricks, which helps us in reading and writing Avro data.

We begin with creating an instance of JavaSparkContext using SparkConf.

SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster(
"local");

JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

Once created, this JavaSparkContext instance is used to create an instance of SQLContext, which will be used to read, operate on and write the Avro data.

SQLContext sqlContext = new SQLContext(sc);

Next, we use the load() API provided by SQLContext to read some Avro data in from a given source. The load() API returns a DataFrame created out of the data read from a specified source. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. Another good thing about DataFrame is that the DataFrame API is available in Scala, Java, Python, and R.

Here is a complete example showing how to read and work with Avro data using DataFrame :

import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.api.java.JavaSparkContext;

public class SQLApiDemo {

    public static void main(String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.load("/Users/tariq/avro_data/browser.avro/", "com.databricks.spark.avro");
        df.schema();
        df.show();
    }
}

Not just this, DataFrame API also allows us to perform various structured data processing operations. For example :
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select("name", df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("name") > 21).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1