Reputation: 21
I am using FlinkKafkaConsumer010 with Flink 1.2.0, and the problem I am facing is that: Is there a way that I can shut down the entire pipeline programmatically if some scenario is seen?
On possible solution is that I can shut down the kafka consumer source by calling the close() method defined inside of FlinkKafkaConsumer010, then the pipeline with shut down as well. For this approach, I create a list that contains the references to all FlinkKafkaConsumer010 instance that I created at the beginning of the pipeline for the kafka topics. Then during the execution of the pipeline, I have another thread that calls close() of each of the FlinkKafkaConsumer010 in my list. I expect that this should shut down the consumer, but the result is that the consumer is still running.
Can someone shed some light on this or give me some other suggestion on how can I shut down the flink pipeline at runtime programmatically?
Upvotes: 0
Views: 1456
Reputation: 41
Is the scenario that you're trying to respond to based on the input events? If so, I would suggest to have a MapFunction somewhere appropriate in the pipeline, and just deliberately throw an exception to fail the job when some condition is met.
The other alternative is to look at the isEndOfStream
method in KeyedDeserializationSchema
. Basically, when the condition is met for some event, signal that the stream has ended.
One other option to consider is to let the MapFunction mentioned above be instead a FlatMapFunction, that send an signaling event to the outside world. A separate process external to the Flink job listens to that event, and when received, shutdown the Flink job via the Flink CLI.
Upvotes: 2