Jicaar
Jicaar

Reputation: 1104

Is there a better way to joining updates from two keyed streams than time window I am doing?

So what I have is a Flink job that receives a message from Kafka and will create two updateObject events that are then sent through the job. For simplicity, one is an increase event and the other is a decrease event (So message comes in, business logic dictates what updateObjects to create so state is updated properly).

To give a sample updateObject:

{
   "batchId":"b1",
   "type":"increase",
   "amount":1,
   "stateToUpdate":"stateAlpha",
   "stateToUpdateValue":null
}

The updateObjects are then sent downstream in the job. Stream is keyed by a value which will have updateObject1 update one state and updateObject2 update a different one by the key. After each updates their state the updated value is in put into the updateObject's stateToUpdateValue and each is sent along through the job.

Now the piece that is tricky is the last step. The final output of the job is an array of the updateObjects from each message. The best idea that could be thought of was to have a tumbling time window of 1 second that collects updateObjects and then when it triggers after 1 second it will check all that are in its window and pair the ones with the same batchId and put them in the final output object and then output it. Obviously this is not one not guaranteeing all may have gotten to that window in the 1 second time, but it also causes a delay in processing since things just sit there.

There is not guarantee that there is always two updateObjects created for every message since its very much a case-by-case type of thing. And since the updateObjects get split into different keyed streams because their keys are always different, it can't be that a single object goes through the first keyed state and updates it and then goes through the next one with a single object that gets updated accordingly for each. Once the keying happens they aren't attached so to speak.

So I wanted to know if anyone could think of a better way to do this as I feel like there definitely has to be.

Upvotes: 0

Views: 334

Answers (1)

David Anderson
David Anderson

Reputation: 43499

You are right to suspect that there's a better way to do this.

Doing this with tumbling windows has two problems:

  1. You are going to wait half a second, on average, and one second at worst, for something that typically won't take that long.
  2. Despite the fact that you're often spending all this time waiting around, this approach based on windows can still rather easily get things wrong. Even if a batch has only two events, and even if they are processed within a few milliseconds of each other, they can still fall on opposite sides of the window boundaries. This is because a one-second-long window is going to be from 12:00:00.000 to 12:00:00:999, for example, and your events might show up at 12:00:00.999 and 12:00:01.001.

Here's an alternative:

At the end of your pipeline, re-key the stream by the batchId, and then use a KeyedProcessFunction to glue the batches back together.

For example, as each record belonging to a specific batch arrives, append it to a keyed ListState object. If you know exactly how many records belong to each batch, then you can keep a counter in ValueState, and when the batch is complete, you can iterate over the list and produce the final result (and don't forget to clear the state when you're finished with it). Or you can use a keyed Timer to wait for some specific duration (relative to the arrival of the first record from each batch), and produce the final result when the Timer fires.

There are examples of working with state and with process functions in the tutorials in Flink's documentation, and in the accompanying training exercises.

Alternatively, you could do this with Flink SQL, using OVER windows.

Upvotes: 1

Related Questions