user1870400
user1870400

Reputation: 6364

How to read from Kafka and Query from an external store like Cassandra in Spark Structured Streaming?

How to read from Kafka and Query from an external store like Cassandra in Spark Structured Streaming?

I get stream of messages from Kafka and I would like to apply Map operation on it and for each key I would like to query a datastore like Cassandra and get more information for that key and apply further operations on the stream. How do I do that using Spark Structured Streaming 2.2.0?

Upvotes: 1

Views: 133

Answers (2)

Mahesh Chand
Mahesh Chand

Reputation: 3250

To read from kafka as a stream,

val spark = SparkSession
              .builder
              .appName("kafka-reading")
              .getOrCreate()

   val df = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", "localhost:9092")
           .option("startingOffsets", "latest")
           .option("subscribe", topicName)
           .load()
           .selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)").as[(String, String)]

if structure of dataframe is complex,while casting you have to provide schema of dataframe.

To perform operation, first you need to use watermarking to accumulate some data for certain time i.e 10 seconds. After watermarking, you can apply groupBy so that you can do aggregation on it. Collect key and value as list. Then by traversing list of keys, you can fetch data from cassandra using key For complete information how to do watermarking and apply aggregation. You can refer Structured Streaming

Upvotes: 0

dumitru
dumitru

Reputation: 2108

A Kafka structured stream can be join with a statically dataframe. As per documentation you are able to do this:

val staticDf = spark.read. ... // read from Cassandra
val streamingDf = spark.readStream. ... // read from stream


// example of join to get information from both Cassandra and stream
streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF

Upvotes: 1

Related Questions