Reputation: 1624
I have this function:
def countNullValueColumn(df: DataFrame): Array[(String, Long)] =
df.columns
.map(x => (x, df.filter(df(x).isNull || df(x) === "" || df(x).isNan).count))
I'm trying to use an val counter = sc.longAccumulator
instead a dataframe count function, without success.
The attempts I've made have been:
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1)} (x, counter.value)})
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1); (x, counter.value)} })
Unfortunately none of these work because it doesn't return the correct type (Array[(String, Long)]
).
Does anyone have any ideas or suggestions? Thanks in advance
P.s. I don't know if using the accumulator is more efficient than the count, but I would just like to try.
Edit: Should I use a foreach
instead of a map
to not have a wrong value in the accumulator? Since the map
is a transformation, while foreach
is an action
Edit2: As suggested by @DNA I changed the map
to foreach
inside my code.
Edit3: Ok, now the problem has become trying to create an Array[(String, Long)]
. I tried this, but the :+
operator doesn't work.
val counter = session.sparkContext.longAccumulator
val res: Array[(String, Long)] = Array()
df.columns
.foreach(x => res :+ (x, df.filter{ df(x).isNull || df(x) === "" || df(x).isNaN {counter.add(1); counter.value}}))
Does anyone have any ideas or suggestions?
Upvotes: 1
Views: 1418
Reputation: 42617
The documentation discusses this topic:
Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
There is an additional problem with getting reliable results from accumulators:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
So for both of these reasons, one should favour actions such as foreach
over transformations such as map
if using an accumulator like this.
Also, note that you are running foreach
over the array of columns, not on the DataFrame itself - then you are running the filter
transformation repeatedly on your DataFrame. So in this case, foreach
isn't a Spark action at all, it's just a method on Array
.
So you probably need a map
over the df.columns
array (so you get an array to return from your function), then a foreach
action over the actual DataFrame (to perform the counting).
Here's one way of doing it:
df.columns.map(col => {
val acc = sc.accumulator(0)
df.foreach(row => {
val v = row.getAs[Any](col)
if (v == null || v == "") acc += 1 // NaN left as an exercise
}
)
(col, acc.value)
})
But note that this is always going to be inefficient because we have to make a pass over the DataFrame for each column. It would probably be more efficient to count all the columns in a single pass (generating a tuple or Map of counts for each row), then merge the counts using reduce
or fold
or similar, rather than using counters.
Upvotes: 1