ChiralCarbon
ChiralCarbon

Reputation: 347

Streaming query not showing any progress in Spark

I am getting status messages of the form from a Spark Structured Streaming Application:

  18/02/12 16:38:54 INFO StreamExecution: Streaming query made progress: {
  "id" : "a6c37f0b-51f4-47c5-a487-8bd269b80142",
  "runId" : "061e41b4-f488-4483-a290-403f1f7eff03",
  "name" : null,
  "timestamp" : "2018-02-12T11:08:54.323Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 30,
    "triggerExecution" : 46
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/home/chiralcarbon/IdeaProjects/spark_structured_streaming/args[0]]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@bcc171"
  }
}

All of the messages have numInputRows with value 0.

The program streams data from a parquet file and outputs the same stream to the console.Following is the code:

def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder.
      master("local")
      .appName("sparkSession")
      .getOrCreate()

    val schema = ..
    val in = spark.readStream
          .schema(schema)
          .parquet("args[0]")

    val query = in.writeStream
          .format("console")
          .outputMode("append")
          .start()
    query.awaitTermination()

  }

}

What is the cause and how do I resolve this?

Upvotes: 3

Views: 2269

Answers (1)

T. Gawęda
T. Gawęda

Reputation: 16076

You have an error in readStream:

val in = spark.readStream
      .schema(schema)
      .parquet("args[0]")

You probably want to read from directory provided in the first argument. Then use instead direct invocation or string interpolation:

val in = spark.readStream
      .schema(schema)
      .parquet(args(0))

or the last line, if expression is longer or have some concatenation in other situation:

.parquet(s"${args(0)}")

Currently your code tries to read from non-existing directory, so no file will be read. After change, directory will be provided in correct way and Spark will start read files

Upvotes: 2

Related Questions