josiah
josiah

Reputation: 1414

Why is each Spark Task not utilizing all allocated cores?

Assume I have 36 cores per executor, one executor per node, and 3 nodes each with 48 cores available. The basic gist of what I've noticed is, when I set each task to use 1 core (the default), my CPU utilization over the workers is about 70% and 36 tasks will execute simultaneously per executor (as I would have expected). However, when I change my configuration to have 6 cores per task (--conf spark.task.cpus=6), I get the drop to 6 tasks at a time per executor (as expected), but my CPU utilization also drops below 10% utilization (unexpected). I would have assumed that Spark would know how to parallelize the workload over the 6 cores.

The implementation details that are important are that I am running a UDF function on a column of a DataFrame and appending the results as a new column on that dataframe. This UDF function uses a @transient object that provides a machine learning algorithm that I'm using. This UDF function is not part of an aggregation or coalesce operation, it is just a map operation over the column implemented like so:

def myUdf = udf { ... }

val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)

I would have expected that Spark would execute 6 myUdf to process 6 records at a time, one for each core, but this doesn't appear to be the case. Is there a way to fix this (without submitting a PR to the Spark project), or at least, can someone explain why this might be happening?

Anticipating the question, I'm experimenting with increasing the number of cores per task in order to reduce the amount of RAM required per executor. Executing too many tasks at once exponentially increases the RAM usage, in this instance.

Upvotes: 3

Views: 3494

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

spark.task.cpus is a number of cores to allocate for each task. It is used to allocate multiple cores to a single task, in case when user code is multi-threaded. If your udf doesn't use multiple (doesn't spawn multiple threads in a single function call) threads then the cores are just wasted.

to process 6 records at a time

allocate 6 cores, with spark.task.cpus set to 1. If you want to limit number of tasks on node, then reduce number of cores offered by each node.

Essentially Spark can determine on its own how to split out mapping a UDF over multiple records concurrently by splitting the records up among each of the Tasks (according to the partitioning) and determining how many simultaneous Tasks each Executor can handle. However, Spark can NOT automatically split the work per Core per Task. To utilize multiple cores per task, the code in the UDF, which would get executed over one record at a time (sequentially) per Task, would need to be written to parallelize the computation in that UDF over a single record.

Upvotes: 3

Related Questions