Reputation: 297
In flink 1.14 how can I have a single sink which both writes to kafka and some other data source.
I've tried creating a custom sink which extends the RichSinkFunction
. In the open
method, I've also initialised a KafkaSink
.
However KafkaSink
has no equivalent of invoke
method which I can call in the invoke
method of my own custom sink.
The documentation says the usage of KafkaSink
is as - stream.sinkTo(kafakSink)
That approach wouldn't work for me because in the sink, I make a decision of whether to write to kafka or not.
Upvotes: 0
Views: 281
Reputation: 76597
As an overarching answer, it's possible to accomplish what you are asking assuming that the underlying sinks are compatible (e.g. expose an invoke()
which could iterate through your available sinks. In this case, the KafkaSink
isn't as trivial as some others (covered further down), so that isn't as easy.
I think two options make sense here, one would be pretty trivial (if applicable) and the other, not so much:
Pre-filtering the Kafka Data - Since you would only conditionally write the data to the Kafka sink, you could consider using a filter()
operation to determine if the record should be written to Kafka at all:
// Sink records to your "other sink"
stream.sinkTo(someOtherSink)
// Conditionally sink records to Kafka
stream
.filter(YourKafkaFilterFunction())
.sinkTo(kafkaSink)
Writing a Custom Sink - If that isn't an option, you'd likely need to write a custom function that would mimic the behavior of the KafkaSink, which uses a KafkaWriter/KafkaCommitter behind the scenes and is not necessarily as trivial as other sinks (it's a TwoPhaseCommittingStatefulSink
).
I'd highly recommend looking at the source for the KafkaSink
itself in the respective repository to see what the implementation of that sink looks like to extend it.
Upvotes: 0