left-icon

Spark Succinctly®
by Marko Švaljek

Previous
Chapter

of
A
A
A

CHAPTER 5

Data Input and Output with Spark

Data Input and Output with Spark


By now, we have gone over a fair share of concepts and recipes on the subject of data processing in Spark. Up until now we mainly output data into text files on disk or simply used interactive shells for Scala and Python. The interactive shell and regular text files are perfectly fine when it comes to learning, and there is nothing wrong with this approach. But sooner or later you will come across a situation where you will have to read data from some standard input source or store it in a way that other processes in the environment will be able to use it. Spark supports a lot of input and output capabilities. We won’t go over every single one of them but will mention the most important ones. In this chapter we are also going to cover how Spark can store data into some of the popular storage technologies like Cassandra. Let’s start with simple text files.

Working with Text Files

We have worked with some examples where we used text files as input and output for our examples. Just as a small refresher, let’s have a look at how to make RDDs representing every line in a text file:

Code Listing 92: Text File Loaded as RDD in Scala

scala> val input = sc.textFile("README.md")

input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[31] at textFile at <console>:21

Code Listing 93: Text file Loaded as RDD in Python

>>> input = sc.textFile("README.md")

Code Listing 94: Text File Loaded as RDD in Java

JavaRDD<String> inputFile = context.textFile("README.md");

We went over how we could use the created RDD in word count examples earlier in the book, so we don’t need to go into details right now. However, there is one very useful feature. Namely, sometimes the data won’t fit into a single file. For instance, if we have really large logs where a new log file is created every now and then, we could use wildcards like “*” to read them all into RDD at once. All at once doesn’t literally mean “go over the whole file at once and load it into memory;” remember, RDDs are lazily loaded, so until a call to action there is no reading of data. Just to make sure that we are on a same page, we are going to analyze a simple file that we create. This will be the number of sold used cars per day. We won’t complicate anything; we’ll just create two files. Place them on your file system to a folder of your choosing:

Code Listing 95: Example File sold-units-2015-08.txt

1000
2000
1000
5000
4000
10000
11000
300
5500

Code Listing 96: Example File sold-units-2015-09.txt

4000

5000

6000

6000

3000

4000

500

700

8800

Now we are going to make example programs that analyze the data and compute average sold cars in two months. Let’s start with Scala:

Code Listing 97: Scala Example for Reading Sold Units

val salesRaw = sc.wholeTextFiles("sold-units-2015*")

val sales = salesRaw.map(x => x._2).

    flatMap(x => x.split("\n")).

    map(x => x.toDouble)

Note that elements of RDD store file description and file content. We are interested only in the content part, so we map by the second part of a tuple. Let’s say that we are interested in an average. Up until now, we haven’t mentioned that spark supports arithmetic functions on RDDs that contain only doubles in them. Calculating averages is actually really easy in Spark:

Code Listing 98: Scala Showing Average Price of Sold Unit

scala> sales.mean()

res30: Double = 4322.222222222223

Let’s do the same with Python:

Code Listing 99: Python Example for Reading Sold Units

sales_raw = sc.wholeTextFiles("sold-units-2015*")

sales = sales_raw.map(lambda x: x[1]

).flatMap(lambda x: x.split("\n")

).map(lambda x: float(x))

Code Listing 100: Python Showing Average Price of Sold Unit

>>> sales.mean()

4322.222222222223

Let’s have a look at the Java version:

Code Listing 101: Java Example for Average Price of Sold Unit

package com.syncfusion.succinctly.spark.examples;

import java.io.FileNotFoundException;

import java.io.PrintWriter;

import java.util.Arrays;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaDoubleRDD;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.DoubleFunction;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import scala.Tuple2;

public class WholeTextFileMeanUnitPriceJava7 {

              public static void main(String[] args) throws FileNotFoundException {

                            if (args.length < 1) {

                                          System.err.println("Please provide a full path to the input files");

                                          System.exit(0);

                            }

                            if (args.length < 2) {

                                          System.err.println("Please provide a full path to the output file");

                                          System.exit(0);

                            }

                            

                            SparkConf conf = new SparkConf()

                                                        .setAppName("WholeTextFileJava7")

                                                        .setMaster("local");

                            JavaSparkContext context = new JavaSparkContext(conf);

                            

                            JavaPairRDD<String, String> salesRaw = context.wholeTextFiles(args[0]);

                            

                            JavaDoubleRDD sales = salesRaw.map(new Function<Tuple2<String, String>, String>() {

                                          public String call(Tuple2<String, String> v1) throws Exception {

                                                        return v1._2();

                                          }

                            }).flatMap(new FlatMapFunction<String, String>() {

                                          public Iterable<String> call(String t) throws Exception {

                                                        return Arrays.asList(t.split("\n"));

                                          }

                            }).mapToDouble(new DoubleFunction<String>() {

                                          @Override

                                          public double call(String t) throws Exception {

                                                        return Double.parseDouble(t);

                                          }

                            });

                            

                            PrintWriter writer = new PrintWriter(args[1]);

                            writer.println(sales.mean());

                            writer.close();

              }

}

Be careful when submitting tasks to Spark. We want all files having a pattern to be resolved by Spark and not by the shell. Here is how I submitted java task to spark:

Code Listing 102 – Task Submit Operation on Linux

$ ./bin/spark-submit \

--class com.syncfusion.succinctly.spark.examples. WholeTextFileMeanUnitPriceJava7 \

--master local \

/root/spark/SparkExamples-0.0.1-SNAPSHOT.jar \

"/root/spark-1.4.1-bin-hadoop2.6/sold-units*" /root/spark/result.txt

$ cat /root/spark/result.txt

4322.222222222223

You might run into trouble if you submit the task without quotations because output will go to one of the input files and you will get no feedback on running. You will also overwrite the input file.

Submitting Scala and Python Tasks to Spark

Until now we haven’t needed to submit Python and Scala tasks to Spark directly. We simply used the interactive shells and were fine. We only showed how to submit Java applications to Spark because Java, at the moment, does not have an interactive shell. In my opinion, it’s best to acquire information when you need it because otherwise you would probably just skim over the section in earlier parts of the book and completely forget about it. Now you will need it. In this chapter we are going to go over JSON serialization in the context of Spark, so we need to include some libraries in our project. We will make a simple word count application again. First, you have to create a file system directory and file structure like this one:

Code Listing 103: Basic Scala Project Structure

$ tree

.

── simple.sbt

└── src

    └── main

        └── scala

            └── ExampleApp.scala

Code Listing 104: File Content of simple.sbt File

name := "Example application"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

Code Listing 105: File Content of ExampleApp.scala File

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object ExampleApp {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("ExampleApp")

    val sc = new SparkContext(conf)

    val readmeFile = sc.textFile(args(0))

    val sparkMentions = readmeFile.filter(line => line.contains("Spark"))

    scala.tools.nsc.io.File(args(1)).writeAll(sparkMentions.count().toString)

  }

}

When you are done creating directory structure and editing files, run the package command. This will create a .jar file. The process is very similar to building a Java application:

Code Listing 106: Running Package Command to Build .jar File

$ sbt package

This will result in creating the ./target/scala-2.10/example-application_2.10-1.0.jar file. Names can vary if you change versions. Submitting a task to Spark is then pretty similar to submitting a Java task:

Code Listing 107: Submitting Scala Example to Spark

$ ./bin/spark-submit \

--class ExampleApp \

--master local \

/root/spark/scala/ExampleApp/target/scala-2.10/example-application_2.10-1.0.jar \

/root/spark-1.4.1-bin-hadoop2.6/README.md /root/spark/scala_example.txt

Submitting a task to Spark is relatively simple with Python. The first part is creating a Python script somewhere on your file system. Let’s call it something simple like simple.py:

Code Listing 108: Python Script simple.py

import sys

from PySpark import SparkContext

sc = SparkContext(appName="ExampleApp")

readme_file = sc.textFile(sys.argv[1])

spark_mentions = readme_file.filter(lambda line: "Spark" in line)

out_file = open(sys.argv[2], 'w+')

out_file.write(str(spark_mentions.count()))

Running the example against Spark is then relatively easy:

Code Listing 109: Python Script simple.py

$ ./bin/spark-submit \

--master local \

/root/spark/python/simple.py \

/root/spark-1.4.1-bin-hadoop2.6/README.md /root/spark/python_example.txt

In the next section, we are going to go over techniques for working with JSON.

Working with JSON files

JSON is one of the most popular data formats of today. It’s used everywhere, from configuration files to web page communication. That’s one of the main reasons why we are going to cover how to use it with Spark. One of the simplest methods of JSON loading is by treating external data as text and then mapping it by using a library. The same goes for writing it out. We’ll create a simple JSON file to use in our example:

 Code Listing 110: Simple File with Multiple JSON Entries test.json

{"year": 2012, "name": "Honda Accord LX", "color": "gray"}

{"year": 2008, "name": "Audi Q7 4.2 Premium", "color": "white"}

{"year": 2006, "name": "Mercedes-Benz CLS-Class CLS500", "color": "black"}

{"year": 2012, "name": "Mitsubishi Lancer", "color": "black"}

{"year": 2010, "name": "BMW 3 Series 328i", "color": "gray"}

{"year": 2006, "name": "Buick Lucerne CX", "color": "gray"}

{"year": 2006, "name": "GMC Envoy", "color": "red"}

Let’s load this with Scala and then sort the data by years:

Code Listing 111: Scala Code to Load and Sort test.json

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import com.fasterxml.jackson.module.scala._

import com.fasterxml.jackson.databind.ObjectMapper

import com.fasterxml.jackson.databind.DeserializationFeature

import java.io.StringWriter;

object JsonLoading {

  case class UsedCar(year: Int, name: String, color: String) extends Serializable

  def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("JsonLoading")

        val sc = new SparkContext(conf)

        // mapper can't get serialized by spark at the moment

        // using workaround

        object Holder extends Serializable {

          @transient lazy val mapper = new ObjectMapper()

          mapper.registerModule(DefaultScalaModule)

        }

        val inputFile = sc.textFile(args(0))

        val usedCars = inputFile.map(x => Holder.mapper.readValue(x, classOf[UsedCar]))

        val carsByYear = usedCars.map(x => (x.year, x))

        val carsSortedByYear = carsByYear.sortByKey(false).collect()

       

        val out = new StringWriter()

        Holder.mapper.writeValue(out, carsSortedByYear)

        val json = out.toString()

        scala.tools.nsc.io.File(args(1)).writeAll(json)

    }

}

The resulting file looks like this:

Code Listing 112: Resulting JSON File

[

  [

    2012,

    {

      "year": 2012,

      "name": "Honda Accord LX",

      "color": "gray"

    }

  ],

  [

    2012,

    {

      "year": 2012,

      "name": "Mitsubishi Lancer",

      "color": "black"

    }

  ],

  [

    2010,

    {

      "year": 2010,

      "name": "BMW 3 Series 328i",

      "color": "gray"

    }

  ],

  [

    2008,

    {

      "year": 2008,

      "name": "Audi Q7 4.2 Premium",

      "color": "white"

    }

  ],

  [

    2006,

    {

      "year": 2006,

      "name": "Mercedes-Benz CLS-Class CLS500",

      "color": "black"

    }

  ],

  [

    2006,

    {

      "year": 2006,

      "name": "Buick Lucerne CX",

      "color": "gray"

    }

  ],

  [

    2006,

    {

      "year": 2006,

      "name": "GMC Envoy",

      "color": "red"

    }

  ]

]

Now let’s do something similar in Python:

Code Listing 113: Python Script to Sort the Entries in JSON

import sys

import json

from PySpark import SparkContext

sc = SparkContext(appName="JsonLoading")

input_file = sc.textFile(sys.argv[1])

used_cars = input_file.map(lambda x: json.loads(x))

cars_by_year = used_cars.map(lambda x: (x["year"], x))

cars_sorted_by_year = cars_by_year.sortByKey(False)

mapped = cars_sorted_by_year.map(lambda x: json.dumps(x))

mapped.saveAsTextFile(sys.argv[2])

After submitting the Python script you should get the following output:

Code Listing 114: Result of Submitting Python Script

[2012, {"color": "gray", "name": "Honda Accord LX", "year": 2012}]

[2012, {"color": "black", "name": "Mitsubishi Lancer", "year": 2012}]

[2010, {"color": "gray", "name": "BMW 3 Series 328i", "year": 2010}]

[2008, {"color": "white", "name": "Audi Q7 4.2 Premium", "year": 2008}]

[2006, {"color": "black", "name": "Mercedes-Benz CLS-Class CLS500", "year": 2006}]

[2006, {"color": "gray", "name": "Buick Lucerne CX", "year": 2006}]

[2006, {"color": "red", "name": "GMC Envoy", "year": 2006}]

As a final example in this section let’s have a look at how we could do JSON manipulations with Java:

Code Listing 115: Java Manipulating JSON

package com.syncfusion.succinctly.spark.examples;

import java.io.FileNotFoundException;

import java.io.PrintWriter;

import java.io.Serializable;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import scala.Tuple2;

public class JsonManipulationJava7 {

              

              public static class UsedCar implements Serializable {

                            public int year;

                            public String name;

                            public String color;

                            

                            // for object mapping

                            public UsedCar() {

                                          

                            }

                            public UsedCar(int year, String name, String color) {

                                          this.year = year;

                                          this.name = name;

                                          this.color = color;

                            }

              }

              public static void main(String[] args) throws FileNotFoundException, JsonProcessingException {

                            if (args.length < 1) {

                                          System.err.println("Please provide a full path to the input files");

                                          System.exit(0);

                            }

                            if (args.length < 2) {

                                          System.err.println("Please provide a full path to the output file");

                                          System.exit(0);

                            }

                            

                            SparkConf conf = new SparkConf()

                                                        .setAppName("JsonManipulationJava7")

                                                        .setMaster("local");

                            JavaSparkContext sc = new JavaSparkContext(conf);

                            

                            JavaRDD<String> inputFile = sc.textFile(args[0]);

                            

                            ObjectMapper mapper = new ObjectMapper();

                            

                            JavaRDD <UsedCar> usedCars = inputFile.map(new Function<String, UsedCar>() {

                                          public UsedCar call(String v1) throws Exception {

                                                        return mapper.readValue(v1, UsedCar.class);

                                          }

                            });

                            

                            JavaPairRDD<Integer, UsedCar> carsByYear = usedCars.mapToPair(new PairFunction<UsedCar, Integer, UsedCar>() {

                                          @Override

                                          public Tuple2<Integer, UsedCar> call(UsedCar t) throws Exception {

                                                        return new Tuple2<Integer, UsedCar>(t.year, t);

                                          }

                            });

                            

                            List<Tuple2<Integer, UsedCar>> carsSortedByYear = carsByYear.sortByKey(false).collect();

                            

                            PrintWriter writer = new PrintWriter(args[1]);

                            writer.println(mapper.writeValueAsString(carsSortedByYear));

                            writer.close();

              }

}

Java also demonstrates exporting a tuple to JSON format. Just so that you can see what it looks like:

Code Listing 116:  Result of Java JSON Manipulation

[

  {

    "_1": 2012,

    "_2": {

      "year": 2012,

      "name": "Honda Accord LX",

      "color": "gray"

    }

  },

  {

    "_1": 2012,

    "_2": {

      "year": 2012,

      "name": "Mitsubishi Lancer",

      "color": "black"

    }

  },

  {

    "_1": 2010,

    "_2": {

      "year": 2010,

      "name": "BMW 3 Series 328i",

      "color": "gray"

    }

  },

  {

    "_1": 2008,

    "_2": {

      "year": 2008,

      "name": "Audi Q7 4.2 Premium",

      "color": "white"

    }

  },

  {

    "_1": 2006,

    "_2": {

      "year": 2006,

      "name": "Mercedes-Benz CLS-Class CLS500",

      "color": "black"

    }

  },

  {

    "_1": 2006,

    "_2": {

      "year": 2006,

      "name": "Buick Lucerne CX",

      "color": "gray"

    }

  },

  {

    "_1": 2006,

    "_2": {

      "year": 2006,

      "name": "GMC Envoy",

      "color": "red"

    }

  }

]

There are a lot of other supported formats, including CSV, Sequence Files, and more. We won’t go into them since we just went over the most basic ones, but there is one more very important thing that we haven’t discussed. It’s how Spark interacts with databases. Spark has very good support for interacting with relational databases. Spark is also often described as a big-data processing framework. So perhaps it would be best to concentrate on one of the most popular big-data storage technologies of today. It’s Apache Cassandra, and it works very well with Spark.

Spark and Cassandra

Spark and Cassandra are a natural fit: big-data processing framework on one side and big-data storage technology on the other. Now, to be honest, I might be a little biased because I work with Cassandra a lot, and my personal opinion is that it’s better to talk about the things you know. I would also like to make a complete, end-to-end example using Spark and Cassandra.

It’s best that we start by installing Cassandra. In previous chapters we described how to install Java on your computer. That’s more than enough to start making things with Cassandra. The next step for you is to download Cassandra itself. There is a commercial version available from a company called DataStax, but to get you started the community version is just fine. Head over to http://www.planetcassandra.org/cassandra/ and download the appropriate version for your system. I won’t go into much detail as it’s really out of scope for this book, but you can find everything you need under http://www.planetcassandra.org/try-cassandra/. There are lots of tutorials available on how to set up Cassandra, but as I mentioned earlier the most important prerequisite is an installed version of Java. If you like using Syncfusion’s resources, there are a lot of free books available at http://www.syncfusion.com/resources/techportal/ebooks, and Cassandra Succinctly is one of them. So without further ado, from this point on I’m assuming you have a running Cassandra instance on your local machine. This section might also be a bit more challenging than the previous ones, but I’ll try to provide as much detail as possible.

One of Cassandra’s strong points is that it can store a lot of data in a relatively small amount of time. This is all thanks to Cassandra’s internals, at the core of which lies the concept of wide row. We won’t go much into it from a theoretical point of view, but I think you will be able to follow along through examples. Cassandra currently has no support for doing aggregate operations like sum, average, and the like. The only available operation is count. Aggregate operations are huge performance killers, even in the relational world. Sooner or later there is one query that simply takes too long, and then administrators or engineers go into it, and what they most often do is create some sort of table or view that has the data prepared right away so that there is no need for joins. That’s actually how you model data with Cassandra, but to be honest, it’s a topic of its own.  We will talk a little bit about internals. Unlike traditional databases, Cassandra can have up to two billion columns per single row. That’s where the name wide row comes from. Cassandra keeps all those columns sorted all the time. You can access the values by sorted columns, but you can’t calculate averages and similar things as with relational databases, so you have to use other technologies, such as Spark in our case.

Navigate to the folder where you installed Cassandra and run a tool called cqlsh. It stands for Cassandra Query Language Shell. We’ll set up the data by using this shell, and then we’ll process the data by using Spark and store the results of our analysis back to Spark. This is mostly introductory material, so I’ll try to keep it simple. I won’t always use the best production level practices because my goal is to get you started. If somebody with experience in the field is reading this, please don’t hold it against me. The started cqlsh should look something like:

Code Listing 117: Running CQL Shell

$ ./cqlsh

Connected to Test Cluster at 127.0.0.1:9042.

[cqlsh 5.0.1 | Cassandra 2.1.8 | CQL spec 3.2.0 | Native protocol v3]

Use HELP for help.

cqlsh>

If you are running your cluster for the very first time, you don’t have defined keyspaces and tables to store data. Don’t think too much about keyspace; for now, you can think of it as a container for tables. Let’s define one. In fact, let’s define two of them right away. Spark will read data from one container and write them to the other one. There are usually multiple keyspaces in production. Note, however, that this is not a production level example:

Code Listing 118: Definition of Keyspaces for Cassandra Weather Station Example

CREATE KEYSPACE weather_in

WITH REPLICATION = {

    'class' : 'SimpleStrategy',

    'replication_factor' : 1

};

CREATE KEYSPACE weather_out

WITH REPLICATION = {

    'class' : 'SimpleStrategy',

    'replication_factor' : 1

};

Let’s create a table for input data:

Code Listing 119: Definition of Table for Cassandra Weather Station Example

use weather_in;

CREATE TABLE measurements (

    station text,

    time timestamp,

    temperature decimal,

    PRIMARY KEY (station, time)

);

Primary key syntax works a bit different than in relational databases. Here, the first part of the primary key specifies what data will have a long row. In our case, every weather station will have one large row. The second part of primary key syntax specifies how data will be sorted in this long row. In our case, it will be sorted by time. Essentially, every time a reading comes in it goes to the end of a long row.

Let’s define some data on our own. We don’t care if that data is in Celsius or Fahrenheit, so we’ll keep the values around zero so that it’s possible on both scales. The difference will be cold and colder depending on which unit you prefer:

Code Listing 120: Some Data (Imagine It Came from a Weather Station)

INSERT INTO measurements (station, time, temperature) VALUES ('A', '2015-12-01 10:00:00', 1);

INSERT INTO measurements (station, time, temperature) VALUES ('A', '2015-12-01 12:00:00', 2);

INSERT INTO measurements (station, time, temperature) VALUES ('A', '2015-12-02 10:00:00', 3);

INSERT INTO measurements (station, time, temperature) VALUES ('A', '2015-12-02 12:00:00', 5);

INSERT INTO measurements (station, time, temperature) VALUES ('A', '2015-12-02 14:00:00', 5);

INSERT INTO measurements (station, time, temperature) VALUES ('B', '2015-12-01 10:00:00', 3);

INSERT INTO measurements (station, time, temperature) VALUES ('B', '2015-12-01 12:00:00', 4);

INSERT INTO measurements (station, time, temperature) VALUES ('B', '2015-12-02 10:00:00', 0);

INSERT INTO measurements (station, time, temperature) VALUES ('B', '2015-12-02 12:00:00', 2);

INSERT INTO measurements (station, time, temperature) VALUES ('B', '2015-12-02 14:00:00', 1);

You can use a star operator and select all of the data out. But it’s not the best practice in production; Cassandra even has a default limit of 10,000 records so that you don’t break anything in production. A correct way to access the data is by providing a station where the measurement took place. For simplicity, we only have stations A and B. If we wanted to know what the temperature was around a certain time on both station A and B, we would have no problem obtaining the data since measurements are sorted by time. But if we tried to select all of the measurements where temperature is above two degrees following, this would happen:

Code Listing 121: Trying to Select All Temperatures Above Certain Value

cqlsh:weather_in> SELECT * from measurements where station = 'A' AND temperature > 2;

InvalidRequest: code=2200 [Invalid query] message="No secondary indexes on the restricted columns support the provided operators: "

And that’s why we need Spark. Let’s say we are interested in the all-time average at a weather station. We’ll keep it simple so we just need to store a weather station, average and the exact time when average was calculated. We’ll let Spark populate the output table. To connect to Cassandra, we are going to use Spark Cassandra Connector, available under https://github.com/datastax/spark-cassandra-connector. In this section we are only going to go over Scala code. You can look up the tutorial on Spark Cassandra Connector and do the example provided here in the language of your preference. The concepts that we cover here are enough to get you started.

We went over how to make stand-alone Spark applications a couple of chapters ago. To connect to Cassandra, we need to add new dependencies. The problem is that the dependencies also have to include some other dependencies. And if we wanted to submit a task to Spark, Spark would not be able to locate referenced .jar files. We’ll have to adapt the project structure a little bit to enable building a so-called fat jar.

If we want to connect to Cassandra, we need to add a new dependency. Also, when we build this fat jar, it shouldn’t include files already included in Spark. To achieve this goal we are going to use a very popular tool available at https://github.com/sbt/sbt-assembly/. We will have to modify the structure of the Spark example a bit. We have to create a new file build.sbt in the example’s root directory. This file should have the following content:

Code Listing 122: Create New build.sbt File in Scala Example Directory

lazy val commonSettings = Seq(

  version := "0.1-SNAPSHOT",

  organization := "syncfusion",

  scalaVersion := "2.10.5"

)

lazy val app = (project in file("app")).

  settings(commonSettings: _*).

  settings(

  )

Update contents of simple.sbt file:

Code Listing 123: Create New simple.sbt File in Scala Example Directory

name := "example-application"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" % "provided"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.4.0-M3"

Create new Scala file:

Code Listing 124: Source Code File CassandraProcessing.scala

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import com.datastax.spark.connector._

import com.datastax.driver.core._

import java.util.Date

object CassandraProcessing {

  case class Average(station: String, time: Date, average: BigDecimal)

  def main(args: Array[String]) {

    val conf = new SparkConf()

    .setAppName("CassandraProcessing")

    .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext(conf)

    val measurements = sc.cassandraTable("weather_in", "measurements")

    val measurementPairs = measurements.map(row => (row.getString("station"), row.getDecimal("temperature")))

    val averages = measurementPairs.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

    var averagesForCassandra = averages.map(x => new Average(x._1, new Date(), x._2._1 / x._2._2))

    averagesForCassandra.saveToCassandra("weather_out", "all_time_average")

  }

}

Create the assembly.sbt file in the project folder with the following content:

Code Listing 125: File project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

Now create a fat jar by running the assembly command:

Code Listing 126: Running Assembly Command

$ sbt assembly

Watch the output of the assembly script and remember the location of the generated jar. Output will be different than in previous examples. Here is how my submit task looks:

Code Listing 127: Submit Task

$ ./bin/spark-submit \

--class CassandraProcessing \

--master local \

/root/spark/scala/ExampleApp/target/scala-2.10/example-application-assembly-1.0.jar

If everything went fine, you should see the results of processing in Cassandra.

Code Listing 128: Results of Processing in Cassandra

cqlsh> use weather_out;

cqlsh> select * from all_time_average;

 station | time                                         | average

-----------+-------------------------------------+---------

       B     | 2015-08-17 22:12:25+0200 |       2

       A     | 2015-08-17 22:12:25+0200 |     3.2

(2 rows)

In previous code listings, the table contained the results of the all-time average calculation and a timestamp when this average was computed. This example contains all the basic techniques that you will use if you get the chance to combine two technologies in practice. The provided example is filled with patterns that can be applied to all sorts of tasks.

Note that although Spark and Cassandra are relatively easy technologies to learn, when it comes to production they have a lot of best practices that you have to be aware of. In a sense, we have just scratched the surface here. But even the longest journey starts with the first step, and I hope that this section will bring you to a totally new path in your life.

With this section we covered a very important milestone on your path to Spark mastery. Be aware that Spark has a very vibrant community, and that you can develop yourself further by looking up online materials and visiting Spark conferences and meetups. I hope you enjoyed this e-book and the provided examples. Let’s proceed to the conclusion.

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.