Reputation: 63
I encountered the following problem in spark:
...
while(...){
key = filtersIterator.next()
pricesOverLeadTimesKeyFiltered = pricesOverLeadTimesFilteredMap_cached
.filter(x => x._1.equals(key))
.values
resultSRB = processLinearBreakevens(pricesOverLeadTimesKeyFiltered)
resultsSRB = resultsSRB.union(resultSRB)
}
....
By this way, I accumulate the same resultSRB in resultsSRB. But here are "some" tricks allowing me to add a different/right resultSRB for each iteration
resultSRB.collect()
or resultSRB.foreach(println)
or println(resultSRB.count)
after each processLinearBreakevens(..)
callpricesOverLeadTimesKeyFiltered
at the beginning of processLinearBreakevens(..)
It seems I need to ensure that all operations must be "flushed" before performing the union. I already tried the union through a temporary variable, or to persist resultSRB, or to persist pricesOverLeadTimesKeyFiltered
but still the same problem.
Could you help me please? Michael
Upvotes: 0
Views: 280
Reputation: 67095
If my assumption is correct; that all of these are var
, then the problem is closures. key
needs to be a val
as it is being lazily captured into your filter
. So, when it is finally acted on, the filtering is always using the last state of key
My example:
def process(filtered : RDD[Int]) = filtered.map(x=> x+1)
var i = 1
var key = 1
var filtered = sc.parallelize(List[Int]())
var result = sc.parallelize(List[Int]())
var results = sc.parallelize(List[Int]())
val cached = sc.parallelize(1 to 1000).map(x=>(x, x)).persist
while(i <= 3){
key = i * 10
val filtered = cached
.filter(x => x._1.equals(key))
.values
val result = process(filtered)
results = results.union(result)
i = i + 1
}
results.collect
//expect 11,21,31 but get 31, 31, 31
To fix it, change key
to be val
in the while
loop and will get your expected 11,21,31
Upvotes: 1