Reputation: 27373
I recently discovered that adding parallel computing (e.g. using parallel-collections) inside UDFs increases performance considerable even when running spark in local[1]
mode or using Yarn with 1 executor and 1 core.
E.g. in local[1]
mode, the Spark-Jobs consumes as much CPU as possible (i.e. 800% if I have 8 cores, measured using top
).
This seems strange because I thought Spark (or yarn) limits the CPU usage per Spark application?
So I wonder why that is and whether it's recommended to use parallel-processing/mutli-threading in spark or should I stick to sparks parallelizing pattern?
Here an example to play with (times measured in yarn client-mode with 1 instance and 1 core)
case class MyRow(id:Int,data:Seq[Double])
// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()
df.show() // trigger computation and caching
// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d
val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)
df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds
Upvotes: 4
Views: 1427
Reputation: 167
Spark are configuration of CPU usage examople
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
change the local[*] it will utilization all of your CPU cores.
Upvotes: 1
Reputation: 12991
Spark does not limit CPU directly, instead it defines the number of concurrent threads spark creates. So for local[1] it would basically run one task at a time in parallel. When you are doing in.par.map{expensive} you are creating threads which spark does not manage and therefore are not handled by this limit. i.e. you told spark to limit itself to a single thread and then created other threads without spark knowing it.
In general, it is not a good idea to do parallel threads inside of a spark operation. Instead, it would be better to tell spark how many threads it can work with and make sure you have enough partitions for parallelism.
Upvotes: 2