Mazen Ezzeddine
Mazen Ezzeddine

Reputation: 822

Flink timeout using KeyedCoProcessFunction and order of reading for FlinkKafkaConsumer

I am using the KeyedCoProcessFunction class in Flink DataStream APIs to implement a timeout like use case. The scenario is as follows: I have an input kafka topic and an output Kafka topic, a service reads from the input topic processes it (for variable amount of time) and then publishes the response in the output kafka topic.

Now to implement the timeout (must be using Flink datastream APIs), I have a FlinkKafkaConsumer that reads from the kafka input topic, and another FlinkKafkaConsumer that reads from the kafka output topic. I am connecting the two streams, and using the processElement1 I am registering a timer and waiting either that the onTimer method be fired (a timeout is declared), or the processElement2 is fired before and hence I delete the timer and do not declare a timeout.

While testing on large scale events, I am seeing NULLPTREXCEPTION and I suspect that processElement2 is fired before ProcessElement1, in the situation described above can the scenario of reading an element from the output topic happen before reading from the input topic due to any reason (knowing that the time taken to process the element by the service might take seconds) and in such case what is the best case to implement the timeout functionality as described above strictly using the Flink DataStream APIs, Any hint please?

Thank you.

Upvotes: 0

Views: 739

Answers (1)

David Anderson
David Anderson

Reputation: 43707

Yes, there's no guarantee that Flink will call the processElement1 method before processElement2. It's likely, but not certain. Or in other words, it probably won't always be the case.

This is exactly the same scenario as you will find in the LongRideAlerts exercise in the Flink Training -- https://github.com/apache/flink-training/tree/master/long-ride-alerts -- so you might emulate the solution used there. The logic is this:

  • whichever event (for a given key) arrives first, from either stream, store it
  • if it is the input event (from topic1) that arrives first, set a timer
  • if it is the output event (from topic2) that arrives second, before the timer fires, delete the timer
  • whichever event (for a given key) arrives second, from either stream, clear the state (that is shared by the two streams)
  • whenever the timer fires, use the input event saved from topic1 in keyed state to create a report and clear the state

This solution does leak state whenever an input event from topic1 is missing. See the discussion accompanying the training exercise for how to deal with that, if it matters (short version: use state TTL).

Upvotes: 1

Related Questions