Reputation: 75
I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?
NOTE: I'm using flink version 14.3
DataStream<String> data = .....
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(parameter.get("bootstrap.servers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(parameter.get("kafka.output.topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
data.sinkTo(sink);
Upvotes: 1
Views: 1506
Reputation: 75
I can sink output to multiple Kafka topics by implementing KafkaRecordSerializationSchema with a custom serialize method as suggested by @DavidAnderson. The code snippet is attached below.
public class CustomSchema implements KafkaRecordSerializationSchema<Tuple2<String,String>> {
private final String encoding = StandardCharsets.UTF_8.name();
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> input, KafkaSinkContext kafkaSinkContext, Long aLong) {
String topic = input.f0;
String data = input.f1;
try {
byte[] value = data==null ? null:data.getBytes(this.encoding);
return new ProducerRecord<>(topic,value);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
And I configured the Kafka sink to use this by setRecordSerializer method.
Upvotes: 0
Reputation: 43409
I haven't tried this, but I believe that rather than using setTopic
to hardwire the sink to a specific topic, you can instead implement the serialize
method on a custom KafkaRecordSerializationSchema
so that each ProducerRecord
it returns specifies the topic it should be written to.
Another option would be to create a separate sink object for every topic, and then use a ProcessFunction
that fans out to set of side outputs, each connected to the appropriate sink.
Upvotes: 3