wdz
wdz

Reputation: 437

Spark performance - how to parallelize large loops?

I have a Spark application containing 8000 loops totally and it runs on a cluster of 5 nodes. Each node has 125GB memory and 32 cores. The code in concern looks like the following:

for (m <- 0 until deviceArray.size) { // there are 1000 device 
  var id = deviceArray(m)

  for (t <- 1 to timePatterns) { // there are 8 time patterns
     var hrpvData = get24HoursPVF(dataDF, id, t).cache()

  var hrpvDataZI = hrpvData.zipWithIndex

  var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)

  var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
  var clusterPVMap = hrpvDataZI.zip(clusterPVPred)

  var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey

  var arrHGinfo = pvhgmRDD.collect 

  // Post process data 
  // .....

  hrpvData.unpersist()
  }
}

The function call get24HoursPVF() prepares feature vectors for k-means, and it takes about 40 seconds. Each loop takes about 50 seconds to finish using the cluster. My data size is from 2 to 3 GB (read from tables). Given 8000 loops, the total time running this Spark application is unacceptable (8000x50s).

Since each device is independent, is there any way to parallelize the 8000 iterations? Or how to utilize clusters to solve the problem of total long running time? Scala Future won't work because it just submits jobs near simultaneously but Spark won't run these jobs simultaneously.

Upvotes: 2

Views: 3375

Answers (1)

Ewan Leith
Ewan Leith

Reputation: 1665

Aside from the for loops, you've got 2 of the slowest API calls in Spark in your code there - groupByKey, and collect.

groupByKey should almost never be used, instead look at reduceByKey, see this Databricks blog for more details.

collect transfers all the data in that RDD to an array on the driver node, unless that's a small amount of data it'll have a fairly big performance impact.

On the for loops, I'm not particularly familiar with what you're trying to do, but in

var hrpvData = get24HoursPVF(dataDF, id, t).cache()

you're building and caching a new dataframe for each id and t value. I'm not sure why you couldn't just build one single dataframe containing each variant of id and t at the start, then run your zipWithIndex, map, etc over that whole dataframe?

Upvotes: 5

Related Questions