Reputation: 11
I am trying to join two Ktable streams and it seems that as an output of JOIN operation I am getting the same message as an output twice . Seems value Joiner is invoked twice during this operation .
Let me know how this can be addressed so that only a single message is emitted as an output of Join operation.
KTable<ID, Message> joinedMsg = msg1.join(msg2, new MsgJoiner());
I receive two identical messages as a result of JOIN between two KTables (msg1 and msg2) .
Upvotes: 1
Views: 1398
Reputation: 20880
This behaviour is noticed usually when caching is enabled.
If there are updates to the same key in both tables, each table is flushed independently, and therefore each table will trigger the join, so you get two results for the same key.
i.e. There are two tables : table1 and table2. Following is the input data received in table1 and table2:
table1 A:1
table2 A:A
When the stores are flushed on the commit interval. it flushes the store for table1, triggers the join and produces A:1:A
. Then it will flush table2, triggers the join and produce A:1:A
You can try disabling cache by setting cache.max.bytes.buffering=0
.
P.S. There is already an open issue in KTable/KTable joins.
Upvotes: 4