Reputation: 287
So I read this answer Spark: how to get the number of written rows? also How to get the number of records written (using DataFrameWriter's save operation)? and it was really helpful and it worked for my input.
But for output for some reason it is always none even though I write to parquet many times in my code. (taskEnd.taskMetrics.outputMetrics= None) always none.
Added example code with accumulables but the output is still not giving me the correct results for the input rows it seems to work correctly for that accumulable.
I am using Scala and Spark 1.6.
I have 2 questions.
How can I fix this with spark 1.6
With a newer version of spark does it work correctly.
Attached is my logging in Spark 1.6
var sc = new SparkContext(sparkConf)
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
if (taskEnd.taskInfo.accumulables.nonEmpty) {
for (i <- 0 until 6) {
println()
if (taskEnd.taskInfo.accumulables.length > i) {
println("value of i " + i)
println("name = " + taskEnd.taskInfo.accumulables(i).name)
println("value = " + taskEnd.taskInfo.accumulables(i).value)
}
if (taskEnd.taskMetrics.inputMetrics != None) {
println("input records " + taskEnd.taskMetrics.inputMetrics.get.recordsRead)
inputRecords += taskEnd.taskMetrics.inputMetrics.get.recordsRead
}
else {
println("task input records are empty")
}
}
}
})
and here is how I write to parquet. I do not use savesAsTable instead .parquet do I need to use .savesAsTable for it to log the output change. I am using Databricks csv to read in my dataframe
df_esd.write.mode("append")
.partitionBy("dt_skey")
.parquet(esd_hdfs_loc)
Thanks any help is greatly appreciated.
Updated some pictures for output from running the above code. Sample output of the inner loop running through the accumulables
As you can see from these 2 pictures the logs of rows written is not very informative but the other accumulables are more informative. In fact it just incremented rows written by one which doesn't make any sense since I am writing millions of records and note in the next one as well it just printed 8 for rows written.
But at the end of the code when running it I get this.
When I verify in the db if this is the number of rows written.
It is the same To me it seems that the last number is the number of rows written. Even though it is not called that. Just says number of rows. Also at the end of the code there is only the one row count not the other 5 accumulables. Only that 1. Thanks
Upvotes: 1
Views: 2272
Reputation: 41957
If you look at
taskEnd.taskInfo.accumulables
You will see that it is bundled with following AccumulableInfo
in ListBuffer
in a sequential order.
AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None),
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None),
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None),
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None),
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None),
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is
taskEnd.taskInfo.accumulables(6).value.get
Upvotes: 2