Sid-Ant
Sid-Ant

Reputation: 297

In Apache Flink, how do I have a sink of sinks?

In flink 1.14 how can I have a single sink which both writes to kafka and some other data source.

I've tried creating a custom sink which extends the RichSinkFunction. In the open method, I've also initialised a KafkaSink.

However KafkaSink has no equivalent of invoke method which I can call in the invoke method of my own custom sink.

The documentation says the usage of KafkaSink is as - stream.sinkTo(kafakSink)

That approach wouldn't work for me because in the sink, I make a decision of whether to write to kafka or not.

Upvotes: 0

Views: 281

Answers (1)

Rion Williams
Rion Williams

Reputation: 76597

As an overarching answer, it's possible to accomplish what you are asking assuming that the underlying sinks are compatible (e.g. expose an invoke() which could iterate through your available sinks. In this case, the KafkaSink isn't as trivial as some others (covered further down), so that isn't as easy.

I think two options make sense here, one would be pretty trivial (if applicable) and the other, not so much:

Pre-filtering the Kafka Data - Since you would only conditionally write the data to the Kafka sink, you could consider using a filter() operation to determine if the record should be written to Kafka at all:

// Sink records to your "other sink"
stream.sinkTo(someOtherSink)

// Conditionally sink records to Kafka
stream
  .filter(YourKafkaFilterFunction())
  .sinkTo(kafkaSink)

Writing a Custom Sink - If that isn't an option, you'd likely need to write a custom function that would mimic the behavior of the KafkaSink, which uses a KafkaWriter/KafkaCommitter behind the scenes and is not necessarily as trivial as other sinks (it's a TwoPhaseCommittingStatefulSink).

I'd highly recommend looking at the source for the KafkaSink itself in the respective repository to see what the implementation of that sink looks like to extend it.

Upvotes: 0

Related Questions