Vinay Cheguri
Vinay Cheguri

Reputation: 75

How to select kafka topic dynamically in apache flink kafka sink?

I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?

NOTE: I'm using flink version 14.3

    DataStream<String> data = .....
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(parameter.get("bootstrap.servers"))
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(parameter.get("kafka.output.topic"))
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    data.sinkTo(sink);

Upvotes: 1

Views: 1506

Answers (2)

Vinay Cheguri
Vinay Cheguri

Reputation: 75

I can sink output to multiple Kafka topics by implementing KafkaRecordSerializationSchema with a custom serialize method as suggested by @DavidAnderson. The code snippet is attached below.

public class CustomSchema implements KafkaRecordSerializationSchema<Tuple2<String,String>> {

private final String encoding = StandardCharsets.UTF_8.name();

@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> input, KafkaSinkContext kafkaSinkContext, Long aLong) {
    String topic = input.f0;
    String data = input.f1;
    try {
        byte[] value = data==null ? null:data.getBytes(this.encoding);
        return new ProducerRecord<>(topic,value);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
    }
}

And I configured the Kafka sink to use this by setRecordSerializer method.

Upvotes: 0

David Anderson
David Anderson

Reputation: 43409

I haven't tried this, but I believe that rather than using setTopic to hardwire the sink to a specific topic, you can instead implement the serialize method on a custom KafkaRecordSerializationSchema so that each ProducerRecord it returns specifies the topic it should be written to.

Another option would be to create a separate sink object for every topic, and then use a ProcessFunction that fans out to set of side outputs, each connected to the appropriate sink.

Upvotes: 3

Related Questions