Reputation: 875
i try inner join ktable and ktable.
a and b table:
create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');
inner join a and b table by r key:
create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;
1) use case. insert new data by slow mode
ksql> insert into a_table values('1','1', 'timeA');
--wait 5 second;
ksql> insert into b_table values('1','1', 'timeB');
select * from ab_table emit changes;
--return 1 row result
print AB_TABLE from beginning;
--return 1 row result
2) use case. insert new data by fast mode
ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');
ksql> print a from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}
ksql> print b from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}
select * from ab_table emit changes;
--return 1 row result
print AB_TABLE from beginning;
--return 2 row result
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
What is hell? Why in second use case i have two dublicate row in topic ?
Update Info about topic \ table
name : B_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
name : A_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
Name : AB_TABLE
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
A_TABLE_ROWTIME | BIGINT
A_TABLE_ROWKEY | VARCHAR(STRING)
A_TABLE_R | VARCHAR(STRING)
A_TABLE_TIME | VARCHAR(STRING)
B_TABLE_ROWTIME | BIGINT
B_TABLE_ROWKEY | VARCHAR(STRING)
B_TABLE_R | VARCHAR(STRING)
B_TABLE_TIME | VARCHAR(STRING)
topic "a" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "b" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "AB_TABLE" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
Upvotes: 1
Views: 117
Reputation: 1893
Worked out what's going on here. It's to do with buffering.
By default, ksqlDB is buffering the input from the two source table changelogs, i.e. topic a
and b
. (This buffering can be useful to compact all several messages reporting changes to the same key into a single output).
When firing in updates to both tables at once the buffering means that both tables are populated when the buffering is flushed. As both sides of a table-table join result in an output, both input events match each other, resulting in two outputs to topic AB_TABLE
.
PRINT AB_TABLE
is correctly showing both of the rows in the changelog.
However, SELECT * FROM AB_TABLE EMIT CHANGES
is also buffering the input, and this buffering compacts the two changes to a single output.
Buffering can be controlled via the cache.max.bytes.buffering
. For example, you can turn off the buffering with:
-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;
I ran your example again after running the above and there was only a single row in AB_TABLE
topic.
One could argue that regardless of any buffering the correct output for the table-table join is only a single row. After all, the first row processed should not find a match and the second should. If you feel strongly about this, then please raise a bug in Github.
Upvotes: 1