Reputation: 9761
I am trying to understand the workaround that is mentioned in:
https://issues.apache.org/jira/browse/KAFKA-3705
as in
Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a, and assuming they are read from two topics which are partitioned on a and b respectively, they need to do the following pattern:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a" tableA.join(tableB', joiner);
I have a hard time to understand what is exactly happening.
In particular that sentence is confusing: "If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a". Also i do not understand the code above either.
Can someone clarify a bit what is happening here ?
This is also mentioned here:
Close the gap between the semantics of KTables in streams and tables in relational databases. It is common practice to capture changes as they are made to tables in a RDBMS into Kafka topics (JDBC-connect, Debezium, Maxwell). These entities typically have multiple one-to-many relationship. Usually RDBMSs offer good support to resolve this relationship with a join. Streams falls short here and the workaround (group by - join - lateral view) is not well supported as well and is not in line with the idea of record based processing. https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
What means (group by - join - lateral view) ? I suspect it is related to the code above, but again a bit hard to follow. Could any one shed some light on this ?
Upvotes: 1
Views: 1993
Reputation: 20810
Well, the below code is the pseudocode for joining two KTables with non-key join:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
tableA.join(tableB', joiner);
Explanation :
Let's say, tableA has a key field "a". In order to join another ktable with tableA, it should be co-partitioned. It should have same keys. Hence we will rekey the ktable tableB with field "a"
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
groupBy()
is shorthand for selectKey()+ groupByKey()
operation.
groupBy(/* select on field "a" */)
will rekey the tableB on field "a" and group by that key. Hence now you have a KGroupedTable having field "a" as key. In order to get KTable, you need to call .aggregate() on this. That is what happening in above code.
P.S. .agg()
should be renamed with .aggregate()
Once tableB' is ready, you can join with tableA using below code.
tableA.join(tableB', joiner);
Here joiner refers to ValueJoiner
implementation.
Example :
// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
/* Below line is ValueJoiner */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
);
At the moment, this is the way for Non-key join on KTables You can find nice explanation in documents : https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join
Upvotes: 1