Reputation: 2157
My end goal
is to write out
and read
the aggregated data to the new Kafka topic
in the batches it gets processed. I followed the official documentation
and a couple of other posts but no luck. I would first read the topic, perform aggregation, save the results in another Kafka topic, and again read the topic and print it in the console. Below is my code:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
object SparkKafkaTopic3 {
def main(ar: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo5")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"), $"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemo6")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo6").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
When I run this, in the console initially there is a below exception
java.lang.IllegalStateException: Cannot find earliest offsets of Set(songDemo4-0). Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
Also, if I try to run this code without
writing to the topic part and reading it again everything works fine.
I tried to read the topic from the shell using consumer command
but no records are displayed. Is there anything that I am missing over here?
Below is my dataset:
>sid,Believer
>sid,Thunder
>sid,Stairway to heaven
>sid,Heaven
>sid,Heaven
>sid,thunder
>sid,Believer
When I ran @Srinivas's code and after reading the new topic I am getting data as below:
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Stairway to heaven, 1]
[[2020-06-07 18:40:40, 2020-06-07 18:41:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Thunder, 1]
Here you can see for Believer the window frame is the same but still, the entries are separate. Why is it so? It should be single entry with count 2 since the window frame is the same
Upvotes: 0
Views: 1227
Reputation: 10372
Check below code.
Added this windowedCount.select(struct("*").cast("string").as("value"))
before you write anything to kafka you have to convert all columns of type string
alias of that column is value
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"),$"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemoA")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemoA").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
Updated - Ordering Output
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
.orderBy($"window.start".asc) // Add this line if you want order.
Ordering or sorting result works only if you use output mode is complete
for any other values it will throw an error.
For example check below code.
val outputTopic = windowedCount
.writeStream
.format("console")
.option("truncate","false")
.outputMode("complete")
.start()
Upvotes: 4