Elidor00
Elidor00

Reputation: 1624

Scala spark - count null value in dataframe columns using accumulator

I have this function:

def countNullValueColumn(df: DataFrame): Array[(String, Long)] = 
   df.columns
      .map(x => (x, df.filter(df(x).isNull || df(x) === "" || df(x).isNan).count))

I'm trying to use an val counter = sc.longAccumulator instead a dataframe count function, without success.

The attempts I've made have been:

df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1)} (x, counter.value)})
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1); (x, counter.value)} })

Unfortunately none of these work because it doesn't return the correct type (Array[(String, Long)]).

Does anyone have any ideas or suggestions? Thanks in advance

P.s. I don't know if using the accumulator is more efficient than the count, but I would just like to try.

Edit: Should I use a foreach instead of a map to not have a wrong value in the accumulator? Since the map is a transformation, while foreach is an action

Edit2: As suggested by @DNA I changed the map to foreach inside my code.

Edit3: Ok, now the problem has become trying to create an Array[(String, Long)]. I tried this, but the :+ operator doesn't work.

val counter = session.sparkContext.longAccumulator
val res: Array[(String, Long)] = Array()
df.columns
    .foreach(x => res :+ (x, df.filter{ df(x).isNull || df(x) === "" || df(x).isNaN {counter.add(1); counter.value}}))

Does anyone have any ideas or suggestions?

Upvotes: 1

Views: 1418

Answers (1)

DNA
DNA

Reputation: 42617

The documentation discusses this topic:

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

There is an additional problem with getting reliable results from accumulators:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

So for both of these reasons, one should favour actions such as foreach over transformations such as map if using an accumulator like this.

Also, note that you are running foreach over the array of columns, not on the DataFrame itself - then you are running the filter transformation repeatedly on your DataFrame. So in this case, foreach isn't a Spark action at all, it's just a method on Array.

So you probably need a map over the df.columns array (so you get an array to return from your function), then a foreach action over the actual DataFrame (to perform the counting).

Here's one way of doing it:

df.columns.map(col => {
  val acc = sc.accumulator(0)
  df.foreach(row => {
    val v = row.getAs[Any](col)
    if (v == null || v == "") acc += 1  // NaN left as an exercise
    }
  )
  (col, acc.value)
})

But note that this is always going to be inefficient because we have to make a pass over the DataFrame for each column. It would probably be more efficient to count all the columns in a single pass (generating a tuple or Map of counts for each row), then merge the counts using reduce or fold or similar, rather than using counters.

Upvotes: 1

Related Questions