Reputation: 13
How would the two scenarios compare or differ
I) stream1.connect(stream2).flatMap(new connectFunction())
flatmap1 of connectFunction will process input from stream1 and flatMap2 will process input from stream2.
II) stream2.connect(stream1).flatMap(new connectFunction())
flatmap1 of connectFunction will process input from stream2 and flatMap2 will process input from stream1.
Upvotes: 0
Views: 424
Reputation: 437
As David mentioned there's no significant difference but in case you're asking because you're worried about the order in which the inputs arrive, you should always keep in mind that there is no guarantee whatsoever that one stream (for example your "control stream" in case you're doing some kind of "dynamic filter") will actually be processed before the stream you want to filter on. There could be lag due to Kafka consumer lag or many other reasons.
As such it is a common pattern to set up a state for both streams so that you can keep items in a "pending processing" state until you receive what you need from the other stream.
For instance, if you have a control stream called control
that contains items from a blocklist/disallowlist that you would like to use to filter an input stream called input
, after keying by a common key, it won't matter whether you do input.connect(control)
or control.connect(input)
. However, the important point is to remember that it's not necessary that the control
stream would have produced anything by the time you receive your first element from input
. For that reason you would set up a state (could be a ListState[InputType]
to keep track of all input
items that have arrived before you received what you needed from control
. In other words, you'd be keeping track of "pending items". Once you do receive the required item from control
you will process all "pending" items from the ListState.
This is documented in the Learn Flink > ETL section, in the last paragraph of the Connected streams section:
It is important to recognize that you have no control over the order in which the flatMap1 and flatMap2 callbacks are called. These two input streams are racing against each other, and the Flink runtime will do what it wants to regarding consuming events from one stream or the other. In cases where timing and/or ordering matter, you may find it necessary to buffer events in managed Flink state until your application is ready to process them. (Note: if you are truly desperate, it is possible to exert some limited control over the order in which a two-input operator consumes its inputs by using a custom Operator that implements the InputSelectable interface.
Upvotes: 1
Reputation: 43707
That’s all there is to it. There’s no significant difference.
Upvotes: 1