Reputation: 21
I have the following code snippet to combine a Seq of (DataFrame, BloomFilter) tuples. The DFs are in chronological order, and the goal is to union and deduplicate all the DFs while always selecting the latest record. These are the input DFs in order:
+---+----------------+
|id |data |
+---+----------------+
|d |[8] |
+---+----------------+
+---+----------------+
|id |data |
+---+----------------+
|b |[4] |
|c |[4] |
+---+----------------+
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
+---+----------------+
And this is the code:
println("print input dfs")
dfList.foreach(_._1.show(20, false))
val combined = dfList.reverse.reduceLeft((tupNewer, tupOlder) => {
println("tupNewer")
tupNewer._1.show(20, false)
println("tupOlder")
tupOlder._1.show(20, false)
val newerDfBloomFilterUDF = udf((s: String) => !tupNewer._2.mightContain(s))
val filteredOlderDf = tupOlder._1.filter(newerDfBloomFilterUDF(col("id")))
val unionedDf = tupNewer._1.union(filteredOlderDf)
println("unionedDf")
unionedDf.show(20, false)
val mergedBloomFilter = tupNewer._2.mergeInPlace(tupOlder._2)
(unionedDf, mergedBloomFilter)
})
println("print combined")
combined._1.show(20, false)
However, when I run it the output makes no sense:
tupNewer
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
+---+----------------+
tupOlder
+---+----------------+
|id |data |
+---+----------------+
|b |[4] |
|c |[4] |
+---+----------------+
unionedDf
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
|c |[4] |
+---+----------------+
tupNewer
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
+---+----------------+
tupOlder
+---+----------------+
|id |data |
+---+----------------+
|d |[8] |
+---+----------------+
unionedDf
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
|d |[8] |
+---+----------------+
print combined
+---+----------------+
|id |data |
+---+----------------+
|b |[4, 16] |
+---+----------------+
At the very least, each tupNewer
value should be equivalent to the previous unionedDf
value, as that is what is passed as the intermediate result after every iteration.
However, it seems as though the accumulator value of the reduceLeft
is not being updated, and so each DF is being compared to the first, while the intermediate steps are lost. This is driving me up the wall. Any help is much appreciated.
I've tried writing the same loop with just DFs rather than tuples and commented out all lines referencing the BloomFilter logic. That code worked as expected.
I then rewrote it with tuples of primitives instead of DFs and BloomFilters to make sure that there wasn't something wrong with using tuples and that worked too.
In other words, I can't isolate what part of the above code is causing such strange behavior from reduceLeft
.
My best guess at this point would be some kind of typing issue, though I am at a loss on how to proceed.
Upvotes: 2
Views: 69