JDev
JDev

Reputation: 1832

How to subscribe to a new topic with subscribePattern?

I am using Spark Structured streaming with Kafka and topic been subscribed as pattern:

option("subscribePattern", "topic.*")

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()

Once I start the job and a new topic is listed say topic.new_topic, the job automatically doesn't start listening to the new topic and it requires a restart.

Is there a way to automatically subscribe to a new pattern without restarting the job?

Spark: 3.0.0

Upvotes: 2

Views: 1205

Answers (1)

Michael Heil
Michael Heil

Reputation: 18525

The default behavior of a KafkaConsumer is to check every 5 minutes if there are new partitions to be consumed. This configuration is set through the Consumer config

metadata.max.age.ms: The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

According to the Spark + Kafka Integration Guide on Kafka Specific Configuration you can set this configuration by using the prefix kafka. as shown below:

.option("kafka.metadata.max.age.ms", "1000")

Through this setting the newly created topic will be consumed 1 second after its creation.

(Tested with Spark 3.0.0 and Kafka Broker 2.5.0)

Upvotes: 4

Related Questions