Karl Dailey
Karl Dailey

Reputation: 118

how to check if stop streaming from kafka topic by a limited time duration or record count?

My ultimate goal is to see if a kafka topic is running and if the data in it is good, otherwise fail / throw an error

if I could pull just 100 messages, or pull for just 60 seconds I think I could accomplish what i wanted. But all the streaming examples / questions I have found online have no intention of shutting down the streaming connection.

Here is the best working code I have so far, that pulls data and displays it, but it keeps trying to pull for more data, and if I try to access it in the next line, it hasnt had a chance to pull the data yet. I assume I need some sort of call back. has anyone done something similar? is this the best way of going about this?

I am using databricks notebooks to run my code

import org.apache.spark.sql.functions.{explode, split}
val kafka = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<kafka server>:9092")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .load()

  val df = kafka.select(explode(split($"value".cast("string"), "\\s+")).as("word"))

  display(df.select($"word"))

Upvotes: 2

Views: 820

Answers (1)

zero323
zero323

Reputation: 330353

The trick is you don't need streaming at all. Kafka source supports batch queries, if you replace readStream with read and adjust startingOffsets and endingOffsets.

val df = spark
  .read
  .format("kafka")
  ... // Remaining options
  .load()

You can find examples in the Kafka streaming documentation.

For streaming queries you can use once trigger, although it might not be the best choice in this case:

df.writeStream
  .trigger(Trigger.Once)
  ... // Handle the output, for example with foreach sink (?)

You could also use standard Kafka client to fetch some data without starting SparkSession.

Upvotes: 4

Related Questions