BigD
BigD

Reputation: 888

How to include kafka timestamp value as columns in spark structured streaming?

I am looking for the solution for adding timestamp value of kafka to my Spark structured streaming schema. I have extracted the value field from kafka and making dataframe. My issue is, I need to get the timestamp field (from kafka) also along with the other columns.

Here is my current code:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

How can I get the timestamp from kafka and add as columns along with other columns?

Upvotes: 1

Views: 4479

Answers (3)

vijayraj34
vijayraj34

Reputation: 2415

In my case of Kafka, I was receiving the values in JSON format. Which contains the actual data along with original Event Time not kafka timestamp. Below is the schema.

val mySchema = StructType(Array(
      StructField("time", LongType),
      StructField("close", DoubleType)
    ))

In order to use watermarking feature of Spark Structured Streaming, I had to cast the time field into the timestamp format.

val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
      .select(from_json($"value", mySchema).as("data"))
      .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))

Now you can use the time field for window operation as well as watermarking purpose.

import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
                      .groupBy(
                          window(col("time"), "1 minute", "30 seconds"),
                          $"close"
                      ).count()

I hope this answer clarifies.

Upvotes: 0

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

At Apache Spark official web page you can find guide: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

There you can find information about the schema of DataFrame that is loaded from Kafka.

Each row from Kafka source has following columns:

  • key - message key
  • value - message value
  • topic - name message topic
  • partition - partitions from which that message came from
  • offset - offset of the message
  • timestamp - timestamp
  • timestampType timestamp type

All of above columns are available to query. In your example you use only value, so to get timestamp just need to add timestamp to your select statement:

  val allFields = kafkaDatademostr.selectExpr(
    s"CAST(value AS STRING) AS csv",
    s"CAST(key AS STRING) AS key",
    s"topic as topic",
    s"partition as partition",
    s"offset as offset",
    s"timestamp as timestamp",
    s"timestampType as timestampType"
  )

Upvotes: 1

Joe Widen
Joe Widen

Reputation: 2448

Timestamp is included in the source schema. Just add a "select timestamp" to get the timestamp like the below.

val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")

Upvotes: 2

Related Questions