Reputation: 6364
How to read from Kafka and Query from an external store like Cassandra in Spark Structured Streaming?
I get stream of messages from Kafka and I would like to apply Map operation on it and for each key I would like to query a datastore like Cassandra and get more information for that key and apply further operations on the stream. How do I do that using Spark Structured Streaming 2.2.0?
Upvotes: 1
Views: 133
Reputation: 3250
To read from kafka as a stream,
val spark = SparkSession
.builder
.appName("kafka-reading")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "latest")
.option("subscribe", topicName)
.load()
.selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)").as[(String, String)]
if structure of dataframe is complex,while casting you have to provide schema of dataframe.
To perform operation, first you need to use watermarking to accumulate some data for certain time i.e 10 seconds. After watermarking, you can apply groupBy so that you can do aggregation on it. Collect key and value as list. Then by traversing list of keys, you can fetch data from cassandra using key For complete information how to do watermarking and apply aggregation. You can refer Structured Streaming
Upvotes: 0
Reputation: 2108
A Kafka structured stream can be join with a statically dataframe. As per documentation you are able to do this:
val staticDf = spark.read. ... // read from Cassandra
val streamingDf = spark.readStream. ... // read from stream
// example of join to get information from both Cassandra and stream
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
Upvotes: 1