Reputation: 437
I have a Spark application containing 8000 loops totally and it runs on a cluster of 5 nodes. Each node has 125GB memory and 32 cores. The code in concern looks like the following:
for (m <- 0 until deviceArray.size) { // there are 1000 device
var id = deviceArray(m)
for (t <- 1 to timePatterns) { // there are 8 time patterns
var hrpvData = get24HoursPVF(dataDF, id, t).cache()
var hrpvDataZI = hrpvData.zipWithIndex
var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)
var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
var clusterPVMap = hrpvDataZI.zip(clusterPVPred)
var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey
var arrHGinfo = pvhgmRDD.collect
// Post process data
// .....
hrpvData.unpersist()
}
}
The function call get24HoursPVF()
prepares feature vectors for k-means, and it takes about 40 seconds. Each loop takes about 50 seconds to finish using the cluster. My data size is from 2 to 3 GB (read from tables). Given 8000 loops, the total time running this Spark application is unacceptable (8000x50s).
Since each device is independent, is there any way to parallelize the 8000 iterations? Or how to utilize clusters to solve the problem of total long running time? Scala Future won't work because it just submits jobs near simultaneously but Spark won't run these jobs simultaneously.
Upvotes: 2
Views: 3375
Reputation: 1665
Aside from the for loops, you've got 2 of the slowest API calls in Spark in your code there - groupByKey
, and collect
.
groupByKey should almost never be used, instead look at reduceByKey
, see this Databricks blog for more details.
collect
transfers all the data in that RDD to an array on the driver node, unless that's a small amount of data it'll have a fairly big performance impact.
On the for loops, I'm not particularly familiar with what you're trying to do, but in
var hrpvData = get24HoursPVF(dataDF, id, t).cache()
you're building and caching a new dataframe for each id and t value. I'm not sure why you couldn't just build one single dataframe containing each variant of id and t at the start, then run your zipWithIndex, map, etc over that whole dataframe?
Upvotes: 5