Reputation: 888
I am looking for the solution for adding timestamp value of kafka to my Spark structured streaming schema. I have extracted the value field from kafka and making dataframe. My issue is, I need to get the timestamp field (from kafka) also along with the other columns.
Here is my current code:
val kafkaDatademostr = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
.option("subscribe","csvstream")
.load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
.select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
"split(value,',')[1] as DFW",
"split(value,',')[2] as DTG",
"split(value,',')[3] as CDF",
"split(value,',')[4] as DFO",
"split(value,',')[5] as SAD",
"split(value,',')[6] as DER",
"split(value,',')[7] as time_for",
"split(value,',')[8] as fort")
How can I get the timestamp from kafka and add as columns along with other columns?
Upvotes: 1
Views: 4479
Reputation: 2415
In my case of Kafka, I was receiving the values in JSON format. Which contains the actual data along with original Event Time not kafka timestamp. Below is the schema.
val mySchema = StructType(Array(
StructField("time", LongType),
StructField("close", DoubleType)
))
In order to use watermarking feature of Spark Structured Streaming, I had to cast the time field into the timestamp format.
val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", mySchema).as("data"))
.select(col("data.time").cast("timestamp").alias("time"),col("data.close"))
Now you can use the time field for window operation as well as watermarking purpose.
import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
.groupBy(
window(col("time"), "1 minute", "30 seconds"),
$"close"
).count()
I hope this answer clarifies.
Upvotes: 0
Reputation: 6593
At Apache Spark official web page you can find guide: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
There you can find information about the schema of DataFrame that is loaded from Kafka.
Each row from Kafka source has following columns:
All of above columns are available to query.
In your example you use only value
, so to get timestamp just need to add timestamp
to your select statement:
val allFields = kafkaDatademostr.selectExpr(
s"CAST(value AS STRING) AS csv",
s"CAST(key AS STRING) AS key",
s"topic as topic",
s"partition as partition",
s"offset as offset",
s"timestamp as timestamp",
s"timestampType as timestampType"
)
Upvotes: 1
Reputation: 2448
Timestamp is included in the source schema. Just add a "select timestamp" to get the timestamp like the below.
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
Upvotes: 2