javadev
javadev

Reputation: 287

Counting the number of output rows Apache Spark error for outputMetrics

So I read this answer Spark: how to get the number of written rows? also How to get the number of records written (using DataFrameWriter's save operation)? and it was really helpful and it worked for my input.

But for output for some reason it is always none even though I write to parquet many times in my code. (taskEnd.taskMetrics.outputMetrics= None) always none.

Added example code with accumulables but the output is still not giving me the correct results for the input rows it seems to work correctly for that accumulable.

I am using Scala and Spark 1.6.

I have 2 questions.

  1. How can I fix this with spark 1.6

  2. With a newer version of spark does it work correctly.

Attached is my logging in Spark 1.6

var sc = new SparkContext(sparkConf)
sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      if (taskEnd.taskInfo.accumulables.nonEmpty) {

        for (i <- 0 until 6) {
          println()
          if (taskEnd.taskInfo.accumulables.length > i) {
            println("value of i " + i)
            println("name = " + taskEnd.taskInfo.accumulables(i).name)
            println("value =  " + taskEnd.taskInfo.accumulables(i).value)
          }  

      if (taskEnd.taskMetrics.inputMetrics != None) {
        println("input records " + taskEnd.taskMetrics.inputMetrics.get.recordsRead)
        inputRecords += taskEnd.taskMetrics.inputMetrics.get.recordsRead
      }
      else {
        println("task input records are empty")
      }

    }
  }
})

and here is how I write to parquet. I do not use savesAsTable instead .parquet do I need to use .savesAsTable for it to log the output change. I am using Databricks csv to read in my dataframe

df_esd.write.mode("append")
  .partitionBy("dt_skey")
  .parquet(esd_hdfs_loc)

Thanks any help is greatly appreciated.

Updated some pictures for output from running the above code. Sample output of the inner loop running through the accumulables

enter image description here

enter image description here

As you can see from these 2 pictures the logs of rows written is not very informative but the other accumulables are more informative. In fact it just incremented rows written by one which doesn't make any sense since I am writing millions of records and note in the next one as well it just printed 8 for rows written.

But at the end of the code when running it I get this.

enter image description here

When I verify in the db if this is the number of rows written.

enter image description here

It is the same To me it seems that the last number is the number of rows written. Even though it is not called that. Just says number of rows. Also at the end of the code there is only the one row count not the other 5 accumulables. Only that 1. Thanks

Upvotes: 1

Views: 2272

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If you look at

taskEnd.taskInfo.accumulables

You will see that it is bundled with following AccumulableInfo in ListBuffer in a sequential order.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)

You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is

taskEnd.taskInfo.accumulables(6).value.get

Upvotes: 2

Related Questions