Raphael Roth
Raphael Roth

Reputation: 27373

is CPU usage in Apache Spark limited?

I recently discovered that adding parallel computing (e.g. using parallel-collections) inside UDFs increases performance considerable even when running spark in local[1] mode or using Yarn with 1 executor and 1 core.

E.g. in local[1] mode, the Spark-Jobs consumes as much CPU as possible (i.e. 800% if I have 8 cores, measured using top).

This seems strange because I thought Spark (or yarn) limits the CPU usage per Spark application?

So I wonder why that is and whether it's recommended to use parallel-processing/mutli-threading in spark or should I stick to sparks parallelizing pattern?

Here an example to play with (times measured in yarn client-mode with 1 instance and 1 core)

case class MyRow(id:Int,data:Seq[Double])

// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()

df.show() // trigger computation and caching

// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d

val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)

df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds

Upvotes: 4

Views: 1427

Answers (2)

user2953788
user2953788

Reputation: 167

Spark are configuration of CPU usage examople

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

change the local[*] it will utilization all of your CPU cores.

Upvotes: 1

Assaf Mendelson
Assaf Mendelson

Reputation: 12991

Spark does not limit CPU directly, instead it defines the number of concurrent threads spark creates. So for local[1] it would basically run one task at a time in parallel. When you are doing in.par.map{expensive} you are creating threads which spark does not manage and therefore are not handled by this limit. i.e. you told spark to limit itself to a single thread and then created other threads without spark knowing it.

In general, it is not a good idea to do parallel threads inside of a spark operation. Instead, it would be better to tell spark how many threads it can work with and make sure you have enough partitions for parallelism.

Upvotes: 2

Related Questions