Spark best practices
Spark execution model
Spark’s simplicity makes it all too easy to ignore its execution model, and still manage to write jobs that eventually complete. With larger datasets, having an understanding of what happens under the hood becomes critical to reduce run-time and avoid out of memory errors.
Let’s start by taking our good old word-count friend as starting example:
rdd = sc.textFile("input.txt")\
.flatMap(lambda line: line.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda x, y: x + y, 3)\
.collect()
RDD operations are compiled into a direct acyclic graph of RDD objects, where each RDD points to the parent it depends on:
At shuffle boundaries, the DAG is partitioned into so-called stages that are executed in order:
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data and reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations. Operations within a stage are pipelined into tasks that can run in parallel:
Stages, tasks and shuffle writes and reads are concrete concepts that can be monitored from the Spark UI. The UI can be accessed from the driver node on port 4040:
Best practices
Spark UI
Running Spark jobs without the Spark UI is like flying blind. The UI allows to monitor and inspect the execution of jobs. To access it remotely a SOCKS proxy is needed as the UI connects also to the worker nodes.
Using a proxy management tool like FoxyProxy allows to automatically filter URLs based on text patterns and to limit the proxy settings to domains that match a set of rules. The browser add-on automatically handles turning the proxy on and off when you switch between viewing websites hosted on the master node, and those on the Internet.
Assuming that you launched your Spark cluster with the EMR service on AWS, type the following command to create a proxy:
ssh -i ~/mykeypair.pem -N -D 8157 hadoop@ec2-...-compute-1.amazonaws.com
Finally, import the following configuration into FoxyProxy:
<?xml version="1.0" encoding="UTF-8"?>
<foxyproxy>
<proxies>
<proxy name="emr-socks-proxy" notes="" fromSubscription="false" enabled="true" mode="manual" selectedTabIndex="2" lastresort="false" animatedIcons="true" includeInCycle="true" color="#0055E5" proxyDNS="true" noInternalIPs="false" autoconfMode="pac" clearCacheBeforeUse="false" disableCache="false" clearCookiesBeforeUse="false" rejectCookies="false">
<matches>
<match enabled="true" name="*ec2*.amazonaws.com*" pattern="*ec2*.amazonaws.com*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*ec2*.compute*" pattern="*ec2*.compute*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="10.*" pattern="http://10.*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*10*.amazonaws.com*" pattern="*10*.amazonaws.com*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*10*.compute*" pattern="*10*.compute*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*localhost*" pattern="*localhost*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
</matches>
<manualconf host="localhost" port="8157" socksversion="5" isSocks="true" username="" password="" domain="" />
</proxy>
</proxies>
</foxyproxy>
Once the proxy is enabled you can open the Spark UI by visiting localhost:4040.
Use the right level of parallelism
Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size. For distributed shuffles, such as groupByKey
and reduceByKey
, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument to an operation. In general, 2-3 tasks per CPU core in your cluster are recommended. That said, having tasks that are too small is also not advisable as there is some overhead paid to schedule and run a task.
As a rule of thumb, tasks should take at least 100 ms to execute; you can ensure that this is the case by monitoring the task execution latency from the Spark UI. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.
Reduce working set size
Sometimes, you will get terrible performance or out of memory errors, because the working set of one of your tasks, such as one of the reduce tasks in groupByKey
, was too large. Spark’s shuffle operations (sortByKey
, groupByKey
, reduceByKey
, join
, etc.) build a hash table within each task to perform the grouping, which can often be large.
Even though those tables spill to disk, getting to the point where the tables need to be spilled increases the memory pressure on the executor, incurring the additional overhead of disk I/O and increased garbage collection. If you are using pyspark, the memory pressure will also increase the chance of Python running out of memory.
The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller.
Avoid groupByKey for associative operations
Both reduceByKey
and groupByKey
can be used for the same purposes, but reduceByKey
works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.
In reduce tasks, key-value pairs are kept in a hash table that can spill to disk, as mentioned in “Reduce working set size.” However, the hash table flushes out the data to disk one key at a time. If a single key has more values than can fit in memory, an out of memory exception occurs. Pre-combining the keys on the mappers before the shuffle operation can drastically reduce the memory pressure and the amount of data shuffled over the network.
Avoid reduceByKey when the input and output value types are different
Consider the job of creating a set of strings for each key:
rdd.map(lambda p: (p[0], {p[1]}))\
.reduceByKey(lambda x, y: x | y)\
.collect()
Note how the input values are strings and the output values are sets. The map operation creates lots of temporary small objects. A better way to handle this scenario is to use aggregateByKey
:
def seq_op(xs, x):
xs.add(x)
return xs
def comb_op(xs, ys):
return xs | ys
rdd.aggregateByKey(set(), seq_op, comb_op).collect()
Avoid the flatMap-join-groupBy pattern
When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup
. That avoids all the overhead associated with unpacking and repacking the groups.
Python memory overhead
The spark.executor.memory
option, which determines the amount of memory to use per executor process, is JVM specific. If you are using pyspark, you can’t set that option to be equal to the total amount of memory available to an executor node as the JVM might eventually use all the available memory leaving nothing behind for Python.
Use broadcast variables
Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.
Cache judiciously
Just because you can cache a RDD in memory, it doesn’t mean you should blindly do so. Depending on how many times the dataset is accessed and the amount of work involved in doing so, recomputation can be faster than the price paid by the increased memory pressure.
It should go without saying that if you only read a dataset once, there is no point in caching it as it will make your job slower. The size of cached datasets can be seen from the Spark UI.
Don’t collect large RDDs
When a collect
operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take
or takeSample
can be used to retrieve only a capped number of elements instead.
Minimize amount of data shuffled
A shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O as each reducer has to pull data across the network from all the mappers.
As of Spark 1.3, the temporary files that mappers create are not cleaned up from Spark’s temporary storage until Spark is stopped, which means that long-running Spark jobs may consume all available disk space. This is done in order to don’t re-compute shuffles.
Use dataframes
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a pandas Dataframe in Python.
Before any computation on a DataFrame starts, the Catalyst optimizer compiles the operations that were used to build the DataFrame into a physical plan for execution. Since the optimizer generates JVM bytecode for execution, pyspark users will experience the same high performance as Scala users.