bzak
bzak

Reputation: 495

Spark 2.4.0 Structured Streaming Kafka Consumer Checkpointing

I am using Spark 2.4.0 Structured Streaming (Batch Mode i.e. spark .read vs .readstream)to consume a Kafka topic. I am checkpointing read offsets and using the .option("startingOffsets", ...) to dictate where to continue reading on next job run.

In the docs is says Newly discovered partitions during a query will start at earliest. However testing showed that when a new partition is added and I use the last checkpoint info, I get the following error: Caused by: java.lang.AssertionError: assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.

How can I check programmatically if any new partitions were created so that I can update my startingOffsets param?

Upvotes: 0

Views: 176

Answers (1)

algorythms
algorythms

Reputation: 1585

So to handle new partitions in Kafka with spark structured streaming, you could try this:

  1. First, fetch the Kafka topic partitions using the listTopics() function from Kafka's AdminClient API.
  2. Compare these with the checkpointed offsets.
  3. For new partitions, set the starting offsets to "earliest" or any desired value. For existing partitions, use checkpointed offsets.
  4. Pass these offsets to Spark's startingOffsets option.

Here is an example of using AdminClient:

from confluent_kafka.admin import AdminClient

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

topics_metadata = admin_client.list_topics().topics
for topic, metadata in topics_metadata.items():
    print(f"Topic: {topic}")
    for partition in metadata.partitions.values():
        print(f"Partition: {partition.id}")

Upvotes: 0

Related Questions