Reputation: 75
I am applying many transformations on a Spark DataFrame (filter, groupBy, join). I want to have the number of rows in the DataFrame after each transformation.
I am currently counting the number of rows using the function count() after each transformation, but this triggers an action each time which is not really optimized.
I was wondering if there is any way of knowing the number of rows without having to trigger another action than the original job.
Upvotes: 5
Views: 5327
Reputation: 75
Coming back to this question after a bit more experience on Apache Spark to complement randal's answer.
You can also use an UDF to increment a counter.
val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")
def countUdf(acc: LongAccumulator): UserDefinedFunction = udf { (x: Int) =>
acc.add(1)
x
}
myDataFrame
.filter(col("x") === lit(3))
.withColumn("x", countUdf(filterCounter)(col("x")))
.groupBy(col("x"))
.agg(max("y"))
.withColumn("x", countUdf(groupByCounter)(col("x")))
.join(myOtherDataframe, col("x") === col("y"))
.withColumn("x", countUdf(joinCounter)(col("x")))
.count()
print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")
This should be more efficient because spark will only have to deserialize the column used in the UDF, but has to be carefully used as catalyst can more easily reorder the operations (like pushing a filter before the call to the udf)
Upvotes: 0
Reputation: 1054
Each operator in itself has couple of metrics. These metrics are visible in the spark UI,'s SQL tab.
If SQL is not used, we could introspect the query execution object of the data frame after execution, to access the metrics (internally accumulators).
Example: df.queryExecution.executedPlan.metrics
will give the metrics of the top most node in DAG.
Upvotes: 2
Reputation: 1330
You could use an accumulator for each stage and increment the accumulator in a map after each stage. Then at the end after you do your action you would have a count for all the stages.
val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")
myDataFrame
.filter(col("x") === lit(3))
.map(x => {
filterCounter.add(1)
x
}) .groupBy(col("x"))
.agg(max("y"))
.map(x => {
groupByCounter.add(1)
x
})
.join(myOtherDataframe, col("x") === col("y"))
.map(x => {
joinCounter.add(1)
x
})
.count()
print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")
Upvotes: 5