Reputation: 1832
I am using Spark Structured streaming with Kafka and topic been subscribed as pattern:
option("subscribePattern", "topic.*")
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
Once I start the job and a new topic is listed say topic.new_topic
, the job automatically doesn't start listening to the new topic and it requires a restart.
Is there a way to automatically subscribe to a new pattern without restarting the job?
Spark: 3.0.0
Upvotes: 2
Views: 1205
Reputation: 18525
The default behavior of a KafkaConsumer is to check every 5 minutes if there are new partitions to be consumed. This configuration is set through the Consumer config
metadata.max.age.ms: The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
According to the Spark + Kafka Integration Guide on Kafka Specific Configuration you can set this configuration by using the prefix kafka.
as shown below:
.option("kafka.metadata.max.age.ms", "1000")
Through this setting the newly created topic will be consumed 1 second after its creation.
(Tested with Spark 3.0.0 and Kafka Broker 2.5.0)
Upvotes: 4