shakedzy
shakedzy

Reputation: 2893

Spark accumulator not counting correctly?

Using Spark 2.1, I have a function that takes a DataFrame and checks to see if all records are on a given Database (Aerospike in this case).

It looks pretty much like this:

def check(df: DataFrame): Long = {
    val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult")
    df.rdd.foreachPartition(iter => {
        val success = //if record is on the database: 1 else: 0 
        //if success = 0, send Slack message with missing record
        finalResult.add(success)
       }
      df.count - finalResult.value
    }

So, the number of Slack messages should match the number returned by the function (total number of missing records), but quite often this is not the case - I get, for example, one Slack message but check = 2. Rerunning it provides check = 1.

Any ideas what's happening?

Upvotes: 1

Views: 1438

Answers (2)

user1529294
user1529294

Reputation: 186

I guess the last line (df.count - finalResult.value) should after the closing bracket. accumulator.value() is correct only at the end.

Upvotes: 0

sil
sil

Reputation: 450

Spark can run a method many times for the same data on different workers, meaning that you are counting each success * the number of times that data was processed on any worker. Hence you can get different results in the accumulator for different passes over the same data.

You cannot use Accumulators to get an exact count in this case. Sorry. :(

Upvotes: 0

Related Questions