PauloMarco
PauloMarco

Reputation: 75

Getting the number of rows in a Spark dataframe without counting

I am applying many transformations on a Spark DataFrame (filter, groupBy, join). I want to have the number of rows in the DataFrame after each transformation.

I am currently counting the number of rows using the function count() after each transformation, but this triggers an action each time which is not really optimized.

I was wondering if there is any way of knowing the number of rows without having to trigger another action than the original job.

Upvotes: 5

Views: 5327

Answers (3)

PauloMarco
PauloMarco

Reputation: 75

Coming back to this question after a bit more experience on Apache Spark to complement randal's answer.

You can also use an UDF to increment a counter.

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

def countUdf(acc: LongAccumulator): UserDefinedFunction = udf { (x: Int) =>
  acc.add(1)
  x
}

myDataFrame
  .filter(col("x") === lit(3))
  .withColumn("x", countUdf(filterCounter)(col("x")))
  .groupBy(col("x"))
  .agg(max("y"))
  .withColumn("x", countUdf(groupByCounter)(col("x")))
  .join(myOtherDataframe, col("x") === col("y"))
  .withColumn("x", countUdf(joinCounter)(col("x")))
  .count()

print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")

This should be more efficient because spark will only have to deserialize the column used in the UDF, but has to be carefully used as catalyst can more easily reorder the operations (like pushing a filter before the call to the udf)

Upvotes: 0

DaRkMaN
DaRkMaN

Reputation: 1054

Each operator in itself has couple of metrics. These metrics are visible in the spark UI,'s SQL tab.

If SQL is not used, we could introspect the query execution object of the data frame after execution, to access the metrics (internally accumulators).

Example: df.queryExecution.executedPlan.metrics will give the metrics of the top most node in DAG.

Upvotes: 2

randal25
randal25

Reputation: 1330

You could use an accumulator for each stage and increment the accumulator in a map after each stage. Then at the end after you do your action you would have a count for all the stages.

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

myDataFrame
    .filter(col("x") === lit(3))
    .map(x => {
      filterCounter.add(1)
      x
    })        .groupBy(col("x"))
    .agg(max("y"))
    .map(x => {
      groupByCounter.add(1)
      x
    })
    .join(myOtherDataframe, col("x") === col("y"))
    .map(x => {
      joinCounter.add(1)
      x
    })
    .count()

print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")

Upvotes: 5

Related Questions