nicognito
nicognito

Reputation: 67

Joins between multiple streams on the same keys

Flink community!

I have a question regarding joining multiple streams on the same keys in Flink (equi-joins). I'm still a newbie evaluating Flink for my team, migrating our Spark batch-processing application to stream processing.

Note: I looked at this article from Fabian Hüske, regarding join processing: Peeking into Apache Flink's Engine Room.

To simplify the question, let's assume you have 3 streams, and each stream has unique records, which can be keyed by an id field. For each record in a stream, you'll find a corresponding record in the other streams. You want to join those streams on the id field.

Questions:

(in Spark, I think the previous join results would not be shuffled, provided the keys are not changed, and the same hash partitioner is used).

What I'm saying is a bit confusing, but what I think is that the first operator would remember data from stream #1 and stream #2, and then, the second operator would remember data from stream #1, stream #2 and stream #3. What I observed, the state size for my first operator is big (my experiment has 1 year of data), but the state size for the second operator is much much bigger... The final checkpoint state size seems to be state size for #1/#2 join, plus state size for #1/#2/#3 join (shouldn't it be just the size of #3, if the #1/#2 join data is the same?)

Thanks, Nicolas

Upvotes: 0

Views: 688

Answers (1)

Arvid Heise
Arvid Heise

Reputation: 3634

Currently, in Flink each join on streams require a complete shuffle including the serialization and deserialization as you mentioned. The main reason is that Flink cannot chain an operators with 2 inputs to previous operator. There is currently work in progress to allow N inputs operators that would exactly avoid the additional shuffle in your use case.

Your join operators each maintain their state individually. That means that your second join contains all joined records and all records from stream #3. If your first join has a cardinality of 1, the second join has a larger state size than the first. The reason for the seemingly redundant replication is that when using time windows (usually the only plausible way of joining streams), both operators may be in different times, such that the first operator already removed the entries from his state when the second operator processes them.

Upvotes: 1

Related Questions