tharindu
tharindu

Reputation: 523

Consume a Kafka Topic every hour using Spark

I want to consume a Kafka topic as a batch where I want to read Kafka topic hourly and read the latest hourly data.

val readStream = existingSparkSession
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hostAddress)
  .option("subscribe", "kafka.raw")
  .load()

But this always read first 20 data rows and these rows are starting from the very beginning so this never pick latest data rows.

How can I read the latest rows on a hourly basis using scala and spark?

Upvotes: 0

Views: 1470

Answers (1)

Michael Heil
Michael Heil

Reputation: 18515

If you read Kafka messages in Batch mode you need to take care of the bookkeeping which data is new and which is not yourself. Remember that Spark will not commit any messages back to Kafka, so every time you restart the batch job it will read from beginning (or based on the setting startingOffsets which defaults to earliest for batch queries.

For your scenario where you want to run the job once every hour and only process the new data that arrived to Kafka in the previous hour, you can make use of the writeStream trigger option Trigger.Once for streaming queries.

There is a nice blog from Databricks that nicely explains why a streaming query with Trigger.Once should be preferred over a batch query.

The main point being:

"When you’re running a batch job that performs incremental updates, you generally have to deal with figuring out what data is new, what you should process, and what you should not. Structured Streaming already does all this for you."

Make sure that you also set the option "checkpointLocation" in your writeStream. In the end, you can have a simple cron job that submits your streaming job once an hour.

Upvotes: 2

Related Questions