We'll use the free Databricks Comunity Edition platform to run our Spark jobs:
Or, feel free to use a local installation of Spark, etc. If Spark isn't already installed on your machine it can take up to an hour to download and build from source locally (there are also pre-built versions that would be faster to set up):
../spark1.6.1/README.md
../spark1.6.1/build/mvn -DskipTests clean package
Spark is a fast and expressive cluster computing system for doing Big Data computation. It's good for iterative tasks, for doing big batch processing, and for interactive data exploration.
It's compatible with Hadoop-supported file systems and data formats (HDFS, S3, SequenceFile, ...), so if you've been using Hadoop you can use it with your existing data and deploy it on your existing clusters.
It achieves fault tolerance through lineage: if you lose a partition (chunk) of data you can reconstruct it through a set of transformations that act on data stored in memory. This is in contrast to distributed shared memory systems where you have to write to disk and roll back.
"Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations. Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is interactive data mining, where a user runs multiple ad-hoc queries on the same subset of the data. Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times."
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distFile = sc.textFile("data.txt")
There are several key interfaces that you should understand when you go to use Spark.
Row
types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.(slide taken from "Introduction to Apache Spark on Databricks" notebook)
Transformation | Description | Type |
---|---|---|
map(func) |
Apply a function over each element | Narrow |
flatMap(func) |
Map then flatten output | Narrow |
filter(func) |
Keep only elements where function is True |
Narrow |
sample(withReplacement, fraction, seed) |
Return a sampled subset of this RDD | Narrow |
groupByKey(k, v) |
Group the values for each key in the RDD into a single sequence | Wide |
reduceByKey(func) |
Merge the values for each key using an associative reduce function | Wide |
Action | Description | Try it Out* |
---|---|---|
collect() |
Return a list that contains all of the elements in this RDD | sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect() |
count() |
Return the number of elements | sc.parallelize([2, 3, 4]).count() |
saveAsTextFile(path) |
Save as a text file, using string representations of elements | sc.parallelize(['foo', '-', 'bar', '!']).saveAsTextFile("/FileStore/foo-bar.txt")]) |
first() |
Return the first element | sc.parallelize([2, 3, 4]).first() |
take(num) |
Take the first num elements | sc.parallelize([2, 3, 4, 5, 6]).take(2) |
When using Databricks the SparkContext
is created for you automatically as sc
.
In the Databricks Community Edition there are no Worker Nodes - the Driver Program (Master) executes the entire code.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect())
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
We've shown only a subset of possible transformations and actions. Check out others for your application in the docs: http://spark.apache.org/docs/latest/api/python/pyspark.html
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))
messages.filter(_.contains("foo")).count
The computation is expressed declaratively and nothing actually takes place until calling count
at the end.
Create a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
Reach out to Meghann Agarwal with any questions or comments on this talk.