Prasad
Prasad

Reputation: 103

Apache Beam KafkaIO - Write to Multiple Topics

Currently, I'm working on Apache Beam Pipeline implementation which consumes data from three different Kafka topics, and after some processing, I create three types of objects adding those data taken from the above-mentioned Kafka topics. Finally, it is required to publish those three objects into three different Kafka topics. It is possible to read from multiple topics using withTopics method in KafkaIO.read but I did not find a KafkaIO feature to write to multiple topics.

I would like to get some advice on how to do this in the most ideal way, appreciate it if anyone can provide some code examples.

Upvotes: 0

Views: 578

Answers (3)

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

You can use KafkaIO.<K, V>writeRecords() for that. It takes a PCollection<ProducerRecord<K, V>> as an input - so, you just need to specify a required output topic for in ProducerRecord for every element or use a default one.

Please, take a look on this test as an example.

Upvotes: 1

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

You can do that with 3 different sinks on a PCollection, example :

    private transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void kafkaIOSinksTest(){
        PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
        
        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic1")
                .withValueSerializer(new StringSerializer())
                .values());

        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic2")
                .withValueSerializer(new StringSerializer())
                .values());

        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic3")
                .withValueSerializer(new StringSerializer())
                .values());
    }

In this example, the same PCollection is sinked in 3 different topics, via multi sinks.

Upvotes: 2

Jeff Klukas
Jeff Klukas

Reputation: 1357

Some Beam sinks (like BigQueryIO) have support for "dynamic destinations" but this isn't the case for KafkaIO. You'll need to set up 3 different sinks for the different topics and you'll need to split up your messages (perhaps using a Partition transform) to separate collections to feed into those sinks.

Upvotes: 0

Related Questions