Michael Benguigui
Michael Benguigui

Reputation: 63

duplicated rows in RDD

I encountered the following problem in spark:

...
while(...){
    key = filtersIterator.next()
    pricesOverLeadTimesKeyFiltered = pricesOverLeadTimesFilteredMap_cached
       .filter(x => x._1.equals(key))
       .values
    resultSRB = processLinearBreakevens(pricesOverLeadTimesKeyFiltered)
    resultsSRB = resultsSRB.union(resultSRB)
}
....

By this way, I accumulate the same resultSRB in resultsSRB. But here are "some" tricks allowing me to add a different/right resultSRB for each iteration

It seems I need to ensure that all operations must be "flushed" before performing the union. I already tried the union through a temporary variable, or to persist resultSRB, or to persist pricesOverLeadTimesKeyFiltered but still the same problem.

Could you help me please? Michael

Upvotes: 0

Views: 280

Answers (1)

Justin Pihony
Justin Pihony

Reputation: 67095

If my assumption is correct; that all of these are var, then the problem is closures. key needs to be a val as it is being lazily captured into your filter. So, when it is finally acted on, the filtering is always using the last state of key

My example:

def process(filtered : RDD[Int]) = filtered.map(x=> x+1)

var i = 1
var key  = 1
var filtered = sc.parallelize(List[Int]())
var result = sc.parallelize(List[Int]())
var results = sc.parallelize(List[Int]())
val cached = sc.parallelize(1 to 1000).map(x=>(x, x)).persist
while(i <= 3){
    key = i * 10
    val filtered = cached
       .filter(x => x._1.equals(key))
       .values
    val result = process(filtered)
    results = results.union(result)
    i = i + 1
}
results.collect
//expect 11,21,31 but get 31, 31, 31

To fix it, change key to be val in the while loop and will get your expected 11,21,31

Upvotes: 1

Related Questions