scordata
scordata

Reputation: 149

Dynamically create and change Kafka topics with Flink

I'm using Flink to read and write data from different Kafka topics. Specifically, I'm using the FlinkKafkaConsumer and FlinkKafkaProducer.

I'd like to know if it is possible to change the Kafka topics I'm reading from and writing to 'on the fly' based on either logic within my program, or the contents of the records themselves.

For example, if a record with a new field is read, I'd like to create a new topic and start diverting records with that field to the new topic.

Thanks.

Upvotes: 3

Views: 2722

Answers (2)

Nizar
Nizar

Reputation: 436

If you have your topics following a generic naming pattern, for example, "topic-n*", your Flink Kafka consumer can automatically reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

Flink 1.5 (FlinkKafkaConsumer09) added support for dynamic partition discovery & topic discovery based on regex. This means that the Flink-Kafka consumer can pick up new Kafka partitions without needing to restart the job and while maintaining exactly-once guarantees.

Consumer constructor that accepts subscriptionPattern: link.

Upvotes: 2

AbhishekN
AbhishekN

Reputation: 368

Thinking more about the requirement,

1st step is - You will start from one topic (for simplicity) and will spawn more topic during runtime based on the data provided and direct respective messages to these topics. It's entirely possible and will not be a complicated code. Use ZkClient API to check if topic-name exists, if does not exist create a model topic with new name and start pushing messages into it through a new producer tied to this new topic. You don't need to restart job to produce messages to a specific topic.

Your initial consumer become producer(for new topics) + consumer(old topic)

2nd step is - You want to consume messages for new topic. One way could be to spawn a new job entirely. You can do this be creating a thread pool initially and supplying arguments to them.

Again be more careful with this, more automation can lead to overload of cluster in case of a looping bug. Think about the possibility of too many topics created after some time if input data is not controlled or is simply dirty. There could be better architectural approaches as mentioned above in comments.

Upvotes: -1

Related Questions