Spark can run in several modes, including YARN client/server, Standalone, Mesos and Local. For this training, we will use local mode. Specifically, you can start the Spark interactive shell by invoking the command below in the terminal to run Spark in the local mode with two threads. Then you will see
Here you can set --driver-memory
according to your local setting. If your setting of driver memory is larger than the VM memory, don't forget to change the VM memory setting first.
In Spark, we call the main entrance of a Spark program the driver and Spark distribute computation to workers to compute. Here in the interactive shell, the Spark shell program is the driver. In above example we set the memory of driver program to 3GB as in local mode driver and worker are together. A driver program can access Spark through a SparkContext
object, which represents a connection to a computing cluster. In the above interactive shell, SparkContext
is already created for you as variable sc
. You can input sc
to see its type.
Resilient Distributed Dataset (RDD) is Spark's core abstraction for working with data. An RDD is simply a fault-tolerant distributed collection of elements. You can imagine RDD as a large array but you cannot access elements randomly but you can apply the same operations to all elements in the array easily. In Spark, all the work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute results. There are two ways to create RDDs: by distributing a collection of objects (e.g., a list or set), or by referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
For the demo purpose, the simplest way to create an RDD is to take an existing collection (e.g. a Scala Array) in your program and pass it to SparkContext's parallelize()
method.
Once created, the distributed dataset (distData
) can be operated in parallel. For example, we can add up the elements by calling distData.reduce((a, b) => a + b)
. You will see more operations on RDD later on.
Parallelizing a collection is useful when you are learning Spark. However, this is not encouraged in production since it requires the entire dataset to be in memory of the driver's machine first. Instead, importing data from external datasets should be employed.
A common way for creating RDDs is loading data from external storage. Below you will learn how to load data from a file system. Assuming you have put some data into HDFS as described in the HDFS Basic section. If not, please do that first.
Here in the above example, each line of the original file will become an element in the lines
RDD.
Reading data from a file system, Spark relies on the HDFS library. In above example we assume HDFS is well configured through environmental variables or configuration files so that data is ready in HDFS.
RDDs offer two types of operations: transformations and actions:
map()
and filter()
.first()
and count()
.Spark treats transformations and actions very differently, so understanding which type of operation you are performing is very important. You can check whether a function is a transformation or an action by looking at its return type: transformations return RDDs, whereas actions return some other data type.
All transformations in Spark are lazy, in that they do not compute the results right away. Instead, they just remember the operations applied to some base dataset (e.g. an Array or a file). The transformations are only computed when an action requires a result to be returned to the driver program. Therefore, the above command of reading in a file has not actually been executed yet. We can force the evaluation of RDDs by calling any actions.
Let's go through some common RDD operations using the healthcare dataset.
Recall that in the file case.csv, each line is a 4-field tuple (patient-id, event-id, timestamp, value)
.
In order to know how large is our raw event sequence data, we can count the number of lines in the input file using count
operation, i.e.
Clearly, count
is an action.
You may wonder what the loaded data looks like, you can take a peek at the data. The take(k)
will return the first k elements in the RDD. Spark also provides collect()
which brings all the elements in the RDD back to the driver program. Note that collect()
should only be used when the data is small. Both take
and collect
are actions.
We got the first 5 records in this RDD. However, this is hard to read due to a poor format. We can make it more readable by traversing the array to print each record on its own line.
Note that in above 3 code block examples, the RDD lines
has been computed (i.e. read in from file) 3 times. We can prevent this by calling lines.cache()
, which will cache the RDD in memory to avoid reloading.
The map
operation in Spark is similar to that of Hadoop. It's a transformation that transforms each item in the RDD into a new item by applying the provided function. Notice this map
will map exactly one element from source to target. For example, suppose we are only interested in knowing IDs of patients, we use map
like
It is also possible to write a more complex, multiple-lines map function. In this case, curly braces should be used in place of parentheses. For example, we can get both patient-id
and event-id
as a tuple at the same time.
As indicated by its name, filter
can transform an RDD to another RDD by keeping only elements that satisfy the filtering condition. For example, we want to count the number of events collected for a particular patient to verify amount of the data from that patient. We can use a filter
function.
distinct
is a transformation that transforms a RDD to another by eliminating duplications. We can use that to count the number of distinct patients. In order to do this, we first extract the patient ID from each line.
We use the map()
function as described above. In this example, we transform each line into the corresponding patient ID by extracting only the first column. We then eliminate duplicate IDs by the distinct()
function.
Sometimes, you will need to group the input events according to patient-id
to put everything about each patient together. For example, in order to extract index date for predictive modeling, you may first group input data by patient then handle each patient seperately in parallel. We can see each element in RDD is a (Key, Value) pair (patient-id, iterable[event])
.
reduceByKey
transforms an RDD[(K, V)]
into RDD[(K, List[V])]
(like what groupByKey does) and then apply reduce
function on List[V]
to get final output RDD[(K, V)]
. Please be careful that we intentionally denote V
as return type of reduce
which should be same as input type of the list element. Suppose now we want to calculate the total payment by each patient. A payment record in the dataset is in the form of (patient-id, PAYMENT, timestamp, value)
.
The payment_events
RDD returned by filter
contains those records associated with payment. Each item is then transformed to a key-value pair (patient-id, payment)
with map
. Because each patient can have multiple payments, we need to use reduceByKey
to sum up the payments for each patient. Here in this example, patient-id
will be served as the key, and payment
will be the value to sum up for each patient. The figure below shows the process of reduceByKey
in our example
We can then find the top-3 patients with the highest payment by using sortBy
first.
and output is
Again in sortBy
we use the _
placeholder, so that _._2
is an anonymous function that returns the second element of a tuple, which is the total payment a patient. The second parameter of sortBy
controls the order of sorting. In above example, false
means decreasing order.
Here, reduceByKey(math.max)
is the simplified expression of reduceByKey(math.max(_,_))
or reduceByKey((a,b) => math.max(a,b))
. math.max
is a function in scala that turns the larger one of two parameters.
Now we have total payment information of patients, we can run some basic statistics. For RDD consists of numeric values, Spark provides some useful statistical primitives.
RDDs support many of the set operations, such as union
and intersection
, even when the RDDs themselves are not properly sets. For example, we can combine the two files by the union
function. Please notice that union
here is not strictly identical to union operation in mathematics as Spark will not remove duplications.
Here, a more straightforward way is to use directory name to read in multiple files of that directory into a single RDD.
A Dataset is a new interface added from Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
A Dataset has a concrete type of a Scala primitive type (Integer, Long, Boolean, etc) or a subclass of a Product - a case class. The case class is preferred for Spark because it handles the serialization code for you thus allowing Spark to shuffle data between workers. This can additional be handled implementing Externalizable which is a much more efficient mechanism to handle serialization, or by using a compact serializer like Kryos.
Additionally, Spark no longer uses SparkContext directly but prefers the use of a SparkSession that encapsulates a SparkContext and a SqlContext. The SparkSession is a member of the sql
package.
There is a wealth of great documentation on the Spark development site.
Datasets can be created explicitly or loaded form a source (e.g. file, stream, parquet, etc).
For the complete list of RDD operations, please see the Spark Programming Guide.