Intro to PySpark

June 20, 2016 -- PyLadies ATX

Environment Setup

We'll use the free Databricks Comunity Edition platform to run our Spark jobs:

  1. Use Google Chrome browser (Firefox should also work, but not Internet Explorer, Safari, etc.)
  2. Sign up for the Community Edition account here: https://databricks.com/try-databricks

Or, feel free to use a local installation of Spark, etc. If Spark isn't already installed on your machine it can take up to an hour to download and build from source locally (there are also pre-built versions that would be faster to set up):

  1. Download: http://spark.apache.org/downloads.html
  2. Open: ../spark1.6.1/README.md
  3. Build: ../spark1.6.1/build/mvn -DskipTests clean package
    • On my laptop, time to build was ~30 mins.

Overview

  • Spark and PySpark: What and Why
  • Exercises:
    • Word Count
    • Logistic Regression
    • Clickstream
  • References and Resources

Spark: What and Why

What is Spark?

Spark is a fast and expressive cluster computing system for doing Big Data computation. It's good for iterative tasks, for doing big batch processing, and for interactive data exploration.

It's compatible with Hadoop-supported file systems and data formats (HDFS, S3, SequenceFile, ...), so if you've been using Hadoop you can use it with your existing data and deploy it on your existing clusters.

It achieves fault tolerance through lineage: if you lose a partition (chunk) of data you can reconstruct it through a set of transformations that act on data stored in memory. This is in contrast to distributed shared memory systems where you have to write to disk and roll back.

Why use Spark?

  • Speed
  • Ease of Use
  • Generality
  • Runs Everywhere

"Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations. Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is interactive data mining, where a user runs multiple ad-hoc queries on the same subset of the data. Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times."

  • Zaharia et al., "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," In NSDI '12, April 2012

Spark vs MapReduce vs MPI vs ...

Spark Architecture

  • Spark Driver and Workers
  • SparkContext (replaced by SparkSession in version 2.X)
  • Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
  • SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN)

https://spark.apache.org/docs/1.1.0/cluster-overview.html

Spark Programming Concepts (version 1.X)

  • SparkContext: entry point to Spark functions
  • Resilient Distributed Datasets (RDDs):
    • Immutable, distributed collections of objects
    • Can be cached in memory for fast reuse
  • Operations on RDDs:
    • Transformations: define a new RDD (map, join, ...)
    • Actions: return or output a result (count, save, ...)
  • Two ways to create RDDs:
    1. By parallelizing an existing collection in your driver program:
      data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
    2. Or by referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat:
      distFile = sc.textFile("data.txt")

      http://spark.apache.org/docs/latest/programming-guide.html

Spark Data Interfaces (versions 1.X and 2.X)

There are several key interfaces that you should understand when you go to use Spark.

  • The Dataset
    • The Dataset is Apache Spark's newest distributed collection and can be considered a combination of DataFrames and RDDs. It provides the typed interface that is available in RDDs while providing a lot of conveniences of DataFrames. It will be the core abstraction going forward.
  • The DataFrame
    • The DataFrame is collection of distributed Row types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.
  • The RDD (Resilient Distributed Dataset)
    • Apache Spark's first abstraction was the RDD or Resilient Distributed Dataset. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user. While this is the original data structure made available, new users should focus on Datasets as those will be supersets of the current RDD functionality.

(slide taken from "Introduction to Apache Spark on Databricks" notebook)

What is PySpark?

  • The Python API for Spark
  • Run interactive jobs in the shell
  • Supports numpy, pandas and other Python libraries

Why use PySpark?

  • If you already know Python
  • Can use Spark in tandem with your favorite Python libraries
  • If you don't need Python libraries, maybe just write code in Scala

PySpark's core classes (version 1.X):

  • pyspark.SparkContext
    Main entry point for Spark functionality.
  • pyspark.RDD
    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
  • pyspark.streaming.StreamingContext
    Main entry point for Spark Streaming functionality.
  • pyspark.streaming.DStream
    A Discretized Stream (DStream), the basic abstraction in Spark Streaming.
  • pyspark.sql.SQLContext
    Main entry point for DataFrame and SQL functionality.
  • pyspark.sql.DataFrame
    A distributed collection of data grouped into named columns.

Transformations

  • Transform one RDD to another, new RDD (immutable)
Transformation Description Type
map(func) Apply a function over each element Narrow
flatMap(func) Map then flatten output Narrow
filter(func) Keep only elements where function is True Narrow
sample(withReplacement, fraction, seed) Return a sampled subset of this RDD Narrow
groupByKey(k, v) Group the values for each key in the RDD into a single sequence Wide
reduceByKey(func) Merge the values for each key using an associative reduce function Wide

https://dzone.com/articles/big-data-processing-spark

Actions

  • Return or output a result
Action Description Try it Out*
collect() Return a list that contains all of the elements in this RDD sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
count() Return the number of elements sc.parallelize([2, 3, 4]).count()
saveAsTextFile(path) Save as a text file, using string representations of elements sc.parallelize(['foo', '-', 'bar', '!']).saveAsTextFile("/FileStore/foo-bar.txt")])
first() Return the first element sc.parallelize([2, 3, 4]).first()
take(num) Take the first num elements sc.parallelize([2, 3, 4, 5, 6]).take(2)

* Try it Out:

  1. Go to your databricks Workspace and create a new directory within your Users directory called "2016-06-20-pyladies-pyspark"
  2. Create a notebook called "0-Introduction" within this directory
  3. Type or copy/paste lines of code into separate cells and run them (you will be prompted to launch a cluster)

When using Databricks the SparkContext is created for you automatically as sc.

In the Databricks Community Edition there are no Worker Nodes - the Driver Program (Master) executes the entire code.

Try a couple more examples with transformations and actions:

In [ ]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect())
In [ ]:
from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

We've shown only a subset of possible transformations and actions. Check out others for your application in the docs: http://spark.apache.org/docs/latest/api/python/pyspark.html

Example: Log Mining

In [ ]:
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))

messages.filter(_.contains("foo")).count

The computation is expressed declaratively and nothing actually takes place until calling count at the end.

Exercises

Exercise 1: Word Count

Create a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

  1. Create a notebook in "2016-06-20-pyladies-pyspark" called "1-WordCount"
  2. Try to implement the following Word Count example:

http://spark.apache.org/examples.html

In [ ]:
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

Exercise 2: Logistic Regression

  1. Create a notebook in "2016-06-20-pyladies-pyspark" called "2-LogisticRegression"
  2. Try to implement one of the following Logistic Regression examples:
In [ ]:
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

Exercise 3: Clickstream

  1. Create a notebook in "2016-06-20-pyladies-pyspark" called "3-Clickstream"
  2. Implement the Clickstream example from the Databricks "Quick Start DataFrames" notebook in Python
  3. Or just run through the example in Scala

Thanks for Coming!

Reach out to Meghann Agarwal with any questions or comments on this talk.