Reputation: 3021
My requirement is to send the data to a different ES sink (based on the data). Ex: If the data contains a particular info send it to sink1 else send it to sink2 etc(basically send it dynamically to any one sink based on the data). I also want to set parallelism separately for ES sink1, ES sink2, Es sink3 etc.
-> Es sink1 (parallelism 4)
Kafka -> Map(Transformations) -> ES sink2 (parallelism 2)
-> Es sink3 (parallelism 2)
Is there any simple way to achieve the above in flink ?
My solution: (but not satisfied with it)
I could come up with a solution but there are intermediate kafka topics which i write to (topic1,topic2,topic3) and then have separate pipelines for Essink1,Essink2 and ESsink3. I want to avoid writing to these intermediate kafka topics.
kafka -> Map(Transformations) -> Kafka topics (Insert into topic1,topic2,topic3 based on the data)
Kafka topic1 -> Essink1(parallelism 4)
Kafka topic2 -> Essink2(parallelism 2)
Kafka topic3 -> Essink3(parallelism 2)
Upvotes: 0
Views: 953
Reputation: 43499
You can use a ProcessFunction [1] with side outputs [2] to split the stream n ways, and then connect each side output stream to the appropriate sink. And then call setParallelism()
[3] on each sink.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-processfunction
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level
Upvotes: 3