Reputation: 2893
Using Spark 2.1, I have a function that takes a DataFrame
and checks to see if all records are on a given Database (Aerospike in this case).
It looks pretty much like this:
def check(df: DataFrame): Long = {
val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult")
df.rdd.foreachPartition(iter => {
val success = //if record is on the database: 1 else: 0
//if success = 0, send Slack message with missing record
finalResult.add(success)
}
df.count - finalResult.value
}
So, the number of Slack messages should match the number returned by the function (total number of missing records), but quite often this is not the case - I get, for example, one Slack message but check = 2
. Rerunning it provides check = 1
.
Any ideas what's happening?
Upvotes: 1
Views: 1438
Reputation: 186
I guess the last line (df.count - finalResult.value) should after the closing bracket. accumulator.value() is correct only at the end.
Upvotes: 0
Reputation: 450
Spark can run a method many times for the same data on different workers, meaning that you are counting each success * the number of times that data was processed on any worker. Hence you can get different results in the accumulator for different passes over the same data.
You cannot use Accumulators to get an exact count in this case. Sorry. :(
Upvotes: 0