Reputation: 38919
I am working on code which uses executor service to parallelize tasks (think machine learning computations done over small dataset over and over again). My goal is to execute some code as fast as possible, multiple times and store the result somewhere (total executions will be on the order of 100M runs atleast).
The logic looks something like this (its a simplified example):
dbconn = new dbconn() //This is reused by all threads
for a in listOfSize1000:
for b in listofSize10:
for c in listOfSize2:
taskcompletionexecutorservice.submit(new runner(a, b, c, dbconn))
At the end, taskcompletionexecutorservice.take() is called and I store the Result from "Future" in a db. But this approach is not really scaling after a point.
So this is what I am doing right now in spark (which is a brutal hack, but I am looking for suggestions on how to best structure this):
sparkContext.parallelize(listOfSize1000).filter(a -> {
dbconn = new dbconn() //Cannot init it outsize parallelize since its not serializable
for b in listofSize10:
for c in listOfSize2:
Result r = new runner(a, b, c. dbconn))
dbconn.store(r)
return true //It serves no purpose.
}).count();
This approach looks inefficient to me since its not truly parallelizing on the smallest unit of job, although this job works alright. Also count is not really doing anything for for me, i added it to trigger the execution. It was inspired by computing the pi example here: http://spark.apache.org/examples.html
So any suggestions of how can I better structure my spark runner so that I can efficiently use spark executors?
Upvotes: 2
Views: 222
Reputation: 7442
So there are a few things we can do to make this code more Spark like. The first is you are using a filter
and count
, but really using the results of either. The function foreach
is probably closer to what you want.
That being said you are creating a DB connection to store the result, and we can look at doing this in a few ways. One is: if the DB is really what you want to use for storage, you could use foreachPartition
OR mapPartitionsWithIndex
to create only one connection per partition and then do a count()
(which I know is a bit ugly but foreachWith
is deprecated as of 1.0.0). You could also just do a simple map
and then save your results to on of the many supported output formats (e.g. saveAsSequenceFile).
Upvotes: 1
Reputation: 1235
You may try another approach to parallelize it even better, though with the price. The code is in scala, but there is cartesian method for python. For simplicity, my lists contain integers.
val rdd1000 = sc.parallelize(list1000)
val rdd10 = sc.parallelize(list10)
val rdd2 = sc.parallelize(list2)
rdd1000.cartesian(rdd10).cartesian(rdd2)
.foreachPartition((tuples: Iterator[Tuple2[Tuple2[Int, Int], Int]]) => {
dbconn =...
for (tuple <- tuples) {
val a = tuple._1._1
val b = tuple._1._2
val c = tuple._2
val r = new Result(a, b, c, dbconn)
dbconn.store(r)
}
})
Filter in your case is a transformation, which is lazy - spark doesn't evaluate it at the call. The process starts only when an action is called. Collect is an action and it starts actual processing in your example. ForeachPartition is also an action and spark starts it right away. ForeachPartition is required here, because it allows to open connection once for the whole partition of the data.
Probably the bad thing with cartesian is that it might imply a shuffle over the cluster, so if you have complex objects, that might hurt performance. That might happen if you're going to read data from external sources. In case you're going to use parallelize, that's fine.
One more thing to be aware is that depending on size of your cluster spark might create pretty high pressure on database you use.
Upvotes: 1