Reputation: 1223
I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.
Current Program:
For each batch, create RDD and merge it with the previous RDD
created using the previous API call rdd.union(currentRdd)
.
Operate on final RDD
A simple program to reproduce the issue:
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
val sc = new SparkContext(conf)
val limit = 1000;
var rdd = sc.emptyRDD[Int]
for (x <- 1 to limit) {
val currentRdd = sc.parallelize(x to x + 3)
rdd = rdd.union(currentRdd)
}
println(rdd.sum())
}
Problem:
- When number of batches are high the program throws a StackOverflowError
: Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply
I assume, that when the number of batches increases the RDD dependency graph becomes really complex and throwing the error.
What is the best way to resolve this problem?
Upvotes: 3
Views: 2328
Reputation: 44957
There is already SparkContext.union
that knows how to properly compute a union
of multiple RDD
s:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)
Alternatively, you could try using this helper function to avoid the creation of a long chain of union
s:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)
The reason why it should work is essentially the same as in the linked answer: O(n)
chain of union
s blows the stack, O(log(n))
-high binary tree of union
s doesn't.
Upvotes: 6