Reputation: 103
Currently, I'm working on Apache Beam Pipeline implementation which consumes data from three different Kafka topics, and after some processing, I create three types of objects adding those data taken from the above-mentioned Kafka topics. Finally, it is required to publish those three objects into three different Kafka topics.
It is possible to read from multiple topics using withTopics
method in KafkaIO.read
but I did not find a KafkaIO feature to write to multiple topics.
I would like to get some advice on how to do this in the most ideal way, appreciate it if anyone can provide some code examples.
Upvotes: 0
Views: 578
Reputation: 1443
You can use KafkaIO.<K, V>writeRecords()
for that. It takes a PCollection<ProducerRecord<K, V>>
as an input - so, you just need to specify a required output topic for in ProducerRecord
for every element or use a default one.
Please, take a look on this test as an example.
Upvotes: 1
Reputation: 6572
You can do that with 3 different sinks on a PCollection
, example :
private transient TestPipeline pipeline = TestPipeline.create();
@Test
public void kafkaIOSinksTest(){
PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic1")
.withValueSerializer(new StringSerializer())
.values());
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic2")
.withValueSerializer(new StringSerializer())
.values());
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic3")
.withValueSerializer(new StringSerializer())
.values());
}
In this example, the same PCollection
is sinked in 3 different topics, via multi sinks.
Upvotes: 2
Reputation: 1357
Some Beam sinks (like BigQueryIO) have support for "dynamic destinations" but this isn't the case for KafkaIO. You'll need to set up 3 different sinks for the different topics and you'll need to split up your messages (perhaps using a Partition
transform) to separate collections to feed into those sinks.
Upvotes: 0