Morrissss
Morrissss

Reputation: 324

Will increasing spark.executor.cores make shuffling faster

Suppose we fix the total number of cores and total memory size of a spark job, and there are plenty of partitions in the input data. Comparing these two configurations:

Here is my question:

  1. Sometimes I find NODE_LOCAL tasks get input from network rather than memory/disk, does it actually means communication between two executor processes on the same machine?
  2. If 1 is true, will the second one be faster since shuffling can be more "process local"?
  3. If there are only map tasks, will the second one be as fast as the first one?
  4. Can I say that the main trade-off between #executor and #executor cores is IO?

Thanks

Upvotes: 2

Views: 3885

Answers (2)

Kelcey Damage
Kelcey Damage

Reputation: 11

Q4.Can I say that the main trade-off between #executor and #executor cores is IO?

The main trade-offs are memory management and IO. If you run some simple multi-stage DAGs with simple maps and reduces, you will see in spark-history that 4 nodes with 2 cores each will process just as many tasks in parallel as 1 node with 8 cores. You will still have de-serialization occur with each initial task load at stage start.

The largest difference is in memory overhead reduction. Not such a big issue in modern servers with lots of ram.

You would be hard pressed to every find someone recommend more then 5 cores for an executor due to a performance fall-off curve in the thread scheduler.

A good rule of thumb for starting cluster/executor sizing is as follows:

(ceiling(total_host_physical_cores * 1.15) - 2) / executor.cores = number of executors per host

Starting with executor.cores = 2 is a good safe bet.

Memory per executor is more about your DAG and your ingest size.

Upvotes: 1

Arvind Kumar
Arvind Kumar

Reputation: 1335

Q1. Sometimes I find NODE_LOCAL tasks get input from network rather than memory/disk, does it actually means communication between two executor processes on the same machine?

NODE_LOCAL tasks may get input from other executors in the same node or it need to be retrieved from systems like HDFS, cached, etc. Yes, NODE_LOCAL tasks means communication between two executor processes in the same node. RACK_LOCAL means that data is in another node and therefore it need to be transferred prior execution.

Q2.If 1 is true, will the second one be faster since shuffling can be more "process local"?

  • 100 executors, 10G memory and 1 core for each executor
  • 20 executors, 50G memory and 5 cores for each executor

    1 is true but deciding which option would be faster depends on several factors (#of executors vs # of executor-cores).

Spark-submit memory parameters such as "Number of executors" and "Number of executor cores" property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.Running executors with too much memory often results in excessive garbage collection delays.

The cores property controls the number of concurrent tasks an executor can run. It has been observed that five tasks per executor can achieve full write throughput.Large number of cores per executor leads to HDFS I/O throughput and thus significantly slow down the application.

Whereas running executors with a single core and less memory throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.

To optimize memory consumption by Spark,identify how much memory your dataset would require.For that you can create a DataFrame,cache it and check the dataset size in the Spark UI's Storage tab.Based on the dataset size and the type of operation you can derive optimal number of executors & cores.

Alternatively - You can avoid setting all of these memory properties by turning on dynamic allocation with the spark.dynamicAllocation.enabled property.Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

Q3. If there are only map tasks, will the second one be as fast as the first one?

May be yes. As per the the recommendations from Cloudera. Second option is better than first but it depends on dataset size.

Q4.Can I say that the main trade-off between #executor and #executor cores is IO?

Not sure about this but recommendation is to have as many executors as data nodes and as many cores as you can get from the cluster.

Upvotes: 5

Related Questions