Reputation: 5892
In the Spark streaming, there is forEachRDD with time parameter, where it is possible to take that time and use it for different purposes - metadata, create additional time column in rdd, ...
val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { (rdd, time) =>
// update metadata with time
// convert rdd to df and add time column
// write df
}
In Structured Streaming the API
val df: Dataset[Row] = spark
.readStream
.format("kafka")
.load()
df.writeStream.trigger(...)
.outputMode(...)
.start()
How is that possible to get similar time (mini-batch time) data for structured streaming to be able to use it in the same way?
Upvotes: 2
Views: 632
Reputation: 1590
I have searched for a function which offers the possibility to get the batchTime but it doesn't seem to exist yet in the Spark Structured Streaming APIs.
Here's a workaround I used to get the batch time (Let's suppose that the batch interval is 2000 milliseconds) using the foreachBatch
which allow us to get the batchId :
val now = java.time.Instant.now
val batchInterval = 2000
df.writeStream.trigger(Trigger.ProcessingTime(batchInterval))
.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
println(now.plusMillis(batchId * batchInterval.milliseconds))
})
.outputMode(...)
.start()
Here's the output :
2019-07-29T17:13:19.880Z
2019-07-29T17:13:21.880Z
2019-07-29T17:13:23.880Z
2019-07-29T17:13:25.880Z
2019-07-29T17:13:27.880Z
2019-07-29T17:13:29.880Z
2019-07-29T17:13:31.880Z
2019-07-29T17:13:33.880Z
2019-07-29T17:13:35.880Z
I hope it helps !
Upvotes: 3