Reputation: 495
I am using Spark 2.4.0 Structured Streaming (Batch Mode i.e. spark .read vs .readstream)to consume a Kafka topic. I am checkpointing read offsets and using the .option("startingOffsets", ...)
to dictate where to continue reading on next job run.
In the docs is says Newly discovered partitions during a query will start at earliest.
However testing showed that when a new partition is added and I use the last checkpoint info, I get the following error:
Caused by: java.lang.AssertionError: assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.
How can I check programmatically if any new partitions were created so that I can update my startingOffsets param?
Upvotes: 0
Views: 176
Reputation: 1585
So to handle new partitions in Kafka with spark structured streaming, you could try this:
Here is an example of using AdminClient:
from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
topics_metadata = admin_client.list_topics().topics
for topic, metadata in topics_metadata.items():
print(f"Topic: {topic}")
for partition in metadata.partitions.values():
print(f"Partition: {partition.id}")
Upvotes: 0