Reputation: 9963
I've got some Spark code that creates a series of RDDs. At the very end I call randomSplit
to split it into 3 sets and then I write each of those to disk. So the 1st stage is:
randomSplit
Because step (4) splits things into 3 sets there's 3 distinct Spark phases here. Towards the end of the 1st stage we start running out of stage 1 tasks but have executors available:
At this point the data sets for several partitions have already been computed. As I understand it randomSplit
runs on a partition by partition basis; in other words it doesn't require a shuffle or a collect - it simply randomly selects rows on a per-partition basis. If that's correct, there's no reason that some of the tasks from stage 2 couldn't run on the available executors - the partitions for their RDDs have been computed and cached. Why doesn't Spark start some of the stage 2 tasks to take advantage of the available resources.
Note: Clearly a "they could but they didn't" answer here is totally valid. I guess what I'm really asking is, is there some technical reason I haven't thought of that makes this impossible (or very hard) or is this just an oversight in implementation?
Here's a simplified version of the code (in Kotlin):
fun run(sc: JavaSparkContext, opts: Options) {
val allData = fetchABunchOfData()
val allDataRdd = sc.parallelize(allData)
val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) }
// Convert each ResponseData to a JSON String
val jsonStrings = taggedAndTokenized.map {
val mapper = AnxJsonUtils.getMapper()
mapper.writeValueAsString(it)
}
// the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the
// entire document set twice.
jsonStrings.cache()
val trainValidTest =
jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed)
trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME)
trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME)
trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME)
}
Upvotes: 1
Views: 373
Reputation: 15714
For a number of reasons, saveAsTextFile
is a blocking call. This means that the Spark master will not receive the second save instruction until the first has been completed.
That said, what you can do if you wish to take advantage of these available resources is to call saveAsTextFile
in three separate threads and wait on their futures. Once a worker has finished its partitions on the first task, it can then start on the second.
Upvotes: 1