Through this post we'll explore the Spark SQL API and see how to use it with Avro data. As stated earlier, Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. A DataFrame is a distributed collection of data organised into named columns. We can think of a DataFrame as a table in a relational database or as a dataframe in R or Python.
Apache Avro is a very popular data serialization system, specially in BigData world. We'll use the spark-avro library for this. spark-avro is a beautiful library provided by Databricks, which helps us in reading and writing Avro data.
We begin with creating an instance of JavaSparkContext using SparkConf.
SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster(
"local");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
Once created, this JavaSparkContext instance is used to create an instance of SQLContext, which will be used to read, operate on and write the Avro data.
SQLContext sqlContext = new SQLContext(sc);
Next, we use the load() API provided by SQLContext to read some Avro data in from a given source. The load() API returns a DataFrame created out of the data read from a specified source. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. Another good thing about DataFrame is that the DataFrame API is available in Scala, Java, Python, and R.
Here is a complete example showing how to read and work with Avro data using DataFrame :
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.api.java.JavaSparkContext;
public class SQLApiDemo {
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.load("/Users/tariq/avro_data/browser.avro/", "com.databricks.spark.avro");
df.schema();
df.show();
}
}
Not just this, DataFrame API also allows us to perform various structured data processing operations. For example :
df.select("name").show();
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select("name", df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df("name") > 21).show();
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show();
// age count
// null 1
// 19 1
// 30 1