TL;DR: Spark executors setup is crucial to the performance of a Spark cluster. Executor parameters can be tuned to your hardware configuration in order to reach optimal usage. I built a small web app that allows you to do just that.
Spark jobs make use of Executors, which are task-running applications, themselves running on a node of the cluster. Spark jobs are subdivided in tasks that are distributed to the executors according to the type of operations and the underlying structure of the data.
Executors have the ability to run multiple tasks simultaneously and use any amount of physical RAM available in a single node. One executor can only run on a single node (usually a single machine or VM). The main configuration is determined by a set of parameters, as it follows:
spark.executors.instancesdefines the total number of executors that is available for Spark;
spark.executors.coresdefines how many CPUs each executor is allowed to use. This directly impacts their multi-tasking ability;
spark.executors.memorydefines how much RAM each executor can use.
This is equivalent to using the
tool and specifying the parameters on the
spark-submit --class <CLASS_NAME> \ --num-executors ? \ --executor-cores ? \ --executor-memory ? [...]
These parameters are all — theoretically — bound to the capacity
of the hardware running Spark,
or that the user is willing to use
(the user might want/have to use less than
In practice one could try and use more resources
than available, even though this would lead
to errors as soon as the YARN resource
manager tries to
access the non-existing resource
(e.g. out-of-memory errors).
Ideally one would have a few cores per executors to allow parallel processing of tasks. Interestingly, there is a best-case scenario of five cores per executors beyond which one starts hitting diminishing returns (as thoroughly described by Cloudera ).
So how does one find the best combination of parameters? More executors is usually better (one can perform more tasks in parallel) but more memory is also better (which forces to have less executors). One would also like to use more then one core per executor (ideally five, but not more than that). More cores per executor means less executors overall, but same task-processing ability due to multitasking.
This post explains in detail how one can configure our parameters of interest, using a practical example and discussing tiny vs fat executors.
Tiny vs Fat: with tiny executors one refers to having only one core per executor, which means having as many executors per node as available cores.
Fat executors use all the cores on a single node so that there is only one executor per cluster node.
Here are the main steps for configuring the parameters on a Spark cluster given the number of cluster nodes, the number of cores per node, and the amount of RAM per node:
spark.executor.coresto find the total number of executors on the cluster;
The aforementioned post optimizes the configuration for a specific example (16 cores/node, 64GB RAM/node). However, using this approach for different hardware configuration would yield to unused cores in each node.
For example, in case there were 8 cores per
node, using 5 executor cores would yield
to 2 wasted cores per node, as only one
executor per node would fit.
One solution would be to set
spark.executor.cores=3 so that only one
core is wasted.
In general the only solutions that never
leave any core unused are having either
tiny executors or fat executors, but both
options have drawbacks, as already mentioned.
The goal here is to have a general solution that can adapt and work for any hardware configuration and waste as little resources as possible
Solving this problem comes down to finding the best number of cores per executor given the underlying hardware. Once that is set, the previously listed procedure works perfectly well as is.
I have implemented the whole procedure
in a small web application based on Flask,
which you can find here.
Here I will briefly describe the algorithm
used to find a best-fit
This function written in Python finds
a suitable value for
def calc_executor_cores(available_cores): executor_cores_max = 5 if available_cores >= executor_cores_max: executor_cores = min(executor_cores_max, available_cores // 2) else: executor_cores = max(1, available_cores // 2) remainder_cores = available_cores % executor_cores while remainder_cores > 1 and executor_cores > 2: executor_cores -= 1 remainder_cores = available_cores % executor_cores return executor_cores
The function takes the number of available cores
per node (after removing the YARN node)
as an input and returns the calculated
Its main steps are:
executor_coresto a number that can fit the available cores;
In short, the best-case scenario is
that one has all the cores used with
executor_cores=5, while the
worst-case scenario is that one has
executor_cores=2 and one unused core.
This is not a very hard problem, it’s just very tedious. My hope is that people can use my app and find it useful, and not have to worry about their Spark configuration ever again.