How to Optimize your Spark Jobs

PUBLISHED ON JUL 2, 2019 — BIG DATA, DATA ENGINEERING

TL;DR: Spark executors setup is crucial to the performance of a Spark cluster. Executor parameters can be tuned to your hardware configuration in order to reach optimal usage. I built a small web app that allows you to do just that.

Spark executors

Spark jobs make use of Executors, which are task-running applications, themselves running on a node of the cluster. Spark jobs are subdivided in tasks that are distributed to the executors according to the type of operations and the underlying structure of the data.

Executors have the ability to run multiple tasks simultaneously and use any amount of physical RAM available in a single node. One executor can only run on a single node (usually a single machine or VM). The main configuration is determined by a set of parameters, as it follows:

  • spark.executors.instances defines the total number of executors that is available for Spark;
  • spark.executors.cores defines how many CPUs each executor is allowed to use. This directly impacts their multi-tasking ability;
  • spark.executors.memory defines how much RAM each executor can use.

This is equivalent to using the spark-submit tool and specifying the parameters on the command line:

spark-submit --class <CLASS_NAME> \
 --num-executors ? \
 --executor-cores ? \
 --executor-memory ? [...]

These parameters are all — theoretically — bound to the capacity of the hardware running Spark, or that the user is willing to use (the user might want/have to use less than full capacity). In practice one could try and use more resources than available, even though this would lead to errors as soon as the YARN resource manager tries to access the non-existing resource (e.g. out-of-memory errors).
Ideally one would have a few cores per executors to allow parallel processing of tasks. Interestingly, there is a best-case scenario of five cores per executors beyond which one starts hitting diminishing returns (as thoroughly described by Cloudera ).

The parameter-fitting conundrum

So how does one find the best combination of parameters? More executors is usually better (one can perform more tasks in parallel) but more memory is also better (which forces to have less executors). One would also like to use more then one core per executor (ideally five, but not more than that). More cores per executor means less executors overall, but same task-processing ability due to multitasking.

A prescription for fixing parameters

This post explains in detail how one can configure our parameters of interest, using a practical example and discussing tiny vs fat executors.

Tiny vs Fat: with tiny executors one refers to having only one core per executor, which means having as many executors per node as available cores.
Fat executors use all the cores on a single node so that there is only one executor per cluster node.

Step-by-step procedure

Here are the main steps for configuring the parameters on a Spark cluster given the number of cluster nodes, the number of cores per node, and the amount of RAM per node:

  1. Reserve one core per node for YARN/Hadoop daemons;
  2. Use remaining cores to count total available cores in the cluster;
  3. Set spark.executor.cores=5;
  4. Divide total available cores by spark.executor.cores to find the total number of executors on the cluster;
  5. Reserve one executor for the application manager (reduce the number of executors by one). Use the resulting value to set spark.executor.instances;
  6. Calculate number of executors per node dividing the number of executors by the number of nodes in the cluster (rounding down to the nearest integer);
  7. Calculate memory per executor dividing total node RAM by executors per node;
  8. Reduce by 7% executor memory to account for heap overhead for YARN/Hadoop; Use resulting number as spark.executor.memory;

The aforementioned post optimizes the configuration for a specific example (16 cores/node, 64GB RAM/node). However, using this approach for different hardware configuration would yield to unused cores in each node.

For example, in case there were 8 cores per node, using 5 executor cores would yield to 2 wasted cores per node, as only one executor per node would fit. One solution would be to set spark.executor.cores=3 so that only one core is wasted. In general the only solutions that never leave any core unused are having either tiny executors or fat executors, but both options have drawbacks, as already mentioned.

The goal here is to have a general solution that can adapt and work for any hardware configuration and waste as little resources as possible

An automated solution

Solving this problem comes down to finding the best number of cores per executor given the underlying hardware. Once that is set, the previously listed procedure works perfectly well as is.

I have implemented the whole procedure in a small web application based on Flask, which you can find here. Here I will briefly describe the algorithm used to find a best-fit spark.executor.cores

The algorithm

This function written in Python finds a suitable value for spark.executor.cores:

def calc_executor_cores(available_cores):
    executor_cores_max = 5
    if available_cores >= executor_cores_max:
        executor_cores = min(executor_cores_max, available_cores // 2)
    else:
        executor_cores = max(1, available_cores // 2)
    remainder_cores = available_cores % executor_cores
    while remainder_cores > 1 and executor_cores > 2:
        executor_cores -= 1
        remainder_cores = available_cores % executor_cores
    return executor_cores

The function takes the number of available cores per node (after removing the YARN node) as an input and returns the calculated value for executor_cores. Its main steps are:

  • initialize executor_cores to a number that can fit the available cores;
  • start reducing it by one unit it until there are no unused cores or executor_cores=2;
  • return the resulting executor_cores.

In short, the best-case scenario is that one has all the cores used with executor_cores=5, while the worst-case scenario is that one has executor_cores=2 and one unused core.

Conclusion

This is not a very hard problem, it’s just very tedious. My hope is that people can use my app and find it useful, and not have to worry about their Spark configuration ever again.

TAGS: FLASK, PYTHON, SPARK, YARN
comments powered by Disqus