1. overview
every spark app consists of a
driver program
runs the user’s
function -
and executes various
parallel operations
on a cluster
the main abstraction spark provides is a
resilient distributed dataset (rdd)
rdds are created by starting with a file in the hadoop file system
or an existing scala collection in the driver program
users may also ask spark to
an rdd in memory -
rdds automatically recover from node failures
a second abstraction is spark is
shared variables
that can be used inparallel operations
support `two types of shared variables-
broadcast variables
cache a value in memory on all nodes
are variables that are only "added" to, such as counters and sums
2. linking with spark
spark 1.3.0 works with python 2.6 or higher (but not python 3)
it uses the standard cpython interpreter so c libraries like numpy can be used
to run spark app in python
to submit apps to a cluster -
to launch an interactive python shell
you need to import some spark classes into your program
from pyspark import SparkContext, SparkConf
3. initialing spark
the first thing a spark program must do is to create a SparkContext object
which tells spark how to access a cluster
to create a
you first need to build a SparkConf objectthat contains information about your app conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
is a name for your app to show on the cluster un -
is a Spark, Mesos or YARN cluster URL, or a speciallocal
string to run in local mode -
in practice when running on a cluster, you will not want to hardcode
in the program, but rather launch the app with spark-sbumit and receive it there -
for local testing and unit tests you can pass “local” to run spark in-process
using the shell
in the pyspark shell a special interpreter-aware SparkContext is already created for you called
making you own SparkContext will not work
argument to set which master the context connects -
to add python.zip, .egg or .py
files to the runtime path -
to add dependencies to your shell session
on exactly four cores$ /usr/local/bin/pyspark --master local[4]
to also add
to the search path (in order to later be able to import code)$ /usr/local/bin/pyspark --master local[4] --py-files code.py
launch pyspark shell in ipython
$ PYSPARK_DRIVER_PYTHON=ipython /usr/local/bin/pyspark $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --profile pyspark --notebook-dir=~/Document/hadoop" /usr/local/bin/pyspark
4. resilient distributed datasets (rdds)
parallelized collection
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) # once created can be operated on in parallel distData.reduce(lambda a, b: a + b)
two ways to create rdds
parallelizing an existing collection in your driver program
referencing a dataset in an external system
shared filesystem hdfs hbase any data source offering a hadoop InputFormat
to cut the dataset into -
spark will run one task for each partition of the cluster
typically you want 2-4 partitions for each cpu in your cluster
sc.parallelize(data, 10)
external datasets
from any storage source supported by hadoop, including your local file system, hdfs, cassandra, hbase, amazon s3
spark supports text files, SequenceFiles, any other hadoop InputFormat
rdds can be created using
’s textFile method>>> distFile = sc.textFile("data.txt")
reading files with spark
using local filesystem the file must also be accessible at the same path on worker nodes
1 copy the file to all workers 2 use a network-mounted shared file system
running on directories, compressed files, wildcards
textFile("/my/directory") textFile("/my/directory/*.txt") textFile("/my/directory/*.gz")
takes an optional second argument for controlling the number of partitions of the file -
apart from text files, spark’s python api also supports several other data fromats
read a directory containing multiple small text files -
support saving an RDD in a simple format consisting of pickled python objects -
SequenceFile and hadoop Input/Output Formats
saving and loading SequenceFiles
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
rdd operations
lines = sc.textFile("data.txt") line_lengths = lines.map(lambda l: len(l)) total_length = line_lengths.reduce(lambda a, b: a + b) # wanted to use line_lengths again later line_lengths.persist()
passing functions to spark
# pass a longer function """my_script.py""" if __name__ == "__main__": def my_func(s): words = s.split(' ') return len(words) sc = SparkContext(...) sc.textFile("data").map(my_func) # pass a reference class MyClass(object): def func(self, s): return s def do_stuff(self, rdd): return rdd.map(self.func) # accessing fields will reference the whole object class MyClass(object): def __init__(self): self.field = 'hello' def do_stuff(self, rdd): return rdd.map(lambda s: self.field + s) # to avoid this issue def do_stuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)
working with key-value pairs
lines = sc.textFile("data.txt") pairs = lines.map(lambda l: (l, 1)) count = pairs.reduceByKey(lambda a, b: a + b) count.sortByKey()
map filter flatMap mapPartitions mapPartitionsWithIndex sample union intersection distinct groupByKey reduceByKey aggregateByKey sortByKey join cogroup cartesian pipe coalesce repartition repartitionAndSortWithinPartitions
reduce collect count first take takeSample takeOrdered saveAsTextFile saveAsSequenceFile saveAsObjectFile countByKey foreach
shuffle operations
background performance impact
rdd persistence
which storage level to choose
removing data
monitor cache usage on each node drop out old data partitions in a least-recently-used (LRU) fashion
5. shared variables
broadcast variables
>>> broadcast_var = sc.broadcast([1, 2, 3]) >>> broadcast_var.value [1, 2, 3]
>>> accum = sc.accumulator(0) >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) >>> accum.value 10 class VectorAccumulator(AccumulatorParam): def zero(self, init_value): return Vector.zeros(init_value) def add_in_place(self, v1, v2): v1 += v2 return v1 vecAccum = sc.accumulator(Vector(...), VectorAccumulator()) accum = sc.accumulator(0) data.map(lambda x: accum.add(x); f(x))
6. deploying to a cluster
7. unit testing
8. migrating from pre-1.0 versions of spark
9. where to go from here
run examples
./bin/run-example SparkPi
./bin/spark-submit examples/src/main/python/pi.py