Reputation: 595
I'm reading data from Kinesis and writing it to ElasticEearch via Spark structured streaming. I need to store the timestamp at which each micro-batch is written to the ElasticSearch index as part of the fields in each record.
For example the first micro-batch from the stream contains 10K records, the timestamp for these 10K records should reflect the moment they were processed (or written to ElasticSearch). Then we should have a new timestamp when the second micro-batch is processed, and so on.
I tried adding a new column with current_timestamp function:
.withColumn("recordDate", current_timestamp())
But it looks like the function is evaluated only once for the entire query lifetime. As a result, all stored records will have the same timestamp indicating the moment the query started. So this timestamp seems to represent "query start datetime" as opposed to the desired one that represents "record datetime".
It would be really great if someone could help explain how this can be achieved.
Much appreciated
Upvotes: 1
Views: 1411
Reputation: 2855
You can do this using a udf like below, You may also add your own formatting,
import org.apache.spark.sql.functions.udf
def current_time = udf(() => {
java.time.LocalDateTime.now().toString
})
To use it,
val streamingDF = ???
val streamingDFWithTime .withColumn("time", current_time()))
streamingDFWithTime.writeStream
...
...
PS: I used a udf in place of the in built current_timestamp
because, using it directly on a stream results in issue discussed here and here
Hope this helps.
Upvotes: 4