Leo
Leo

Reputation: 1046

Dynamic routing/duplicating to multiple topics in Kafka 2

Kafka 2 added support for dynamic routing through TopicNameExtractor interface, that supports only one topic value.

Perhaps what I'm going to describe is a bad design but at this stage I'm just curious what's possible in Kafka streams.

Assuming that every message comes with a list of tags, is there a way to duplicate the message to multiple topics, based on that list of tags?

Upvotes: 1

Views: 855

Answers (2)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6583

How Matthias mentioned you have to duplicate messages. It can be easily made using KStream::flatMapValues(ValueMapperWithKey ...)

Sample code is as below. Message will be duplicated based on tags: List<String>.

Model:

public class Person {
    public String name;
    public List<String> tags;
    public transient String mainTag;
    public Person(String name, List<String> tags) {
        this.name = name;
        this.tags = tags;
    }

    public Person(String name, List<String> tags, String mainTag) {
        this.name = name;
        this.tags = tags;
        this.mainTag = mainTag;
    }
}

Application:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerdes.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Person> input = builder.stream("input");
input.flatMapValues(((readOnlyKey, person) ->
        person.tags
                .stream()
                .map(tag -> new Person(person.name, person.tags, tag))
                .collect(Collectors.toList()))
).to((key, person, recordContext) -> person.mainTag);

Upvotes: 1

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

That is not possible at the moment. However, there is already a feature request for it: https://issues.apache.org/jira/browse/KAFKA-7578

At the moment, it is only possible to write a record to multiple output topics, if the record is duplicated and sent to multiple sinks.

Upvotes: 1

Related Questions