padavan
padavan

Reputation: 875

KSQL KTabke+KTable Join dublicate result anomaly

i try inner join ktable and ktable.

a and b table:

 create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
 create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');

inner join a and b table by r key:

create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;

1) use case. insert new data by slow mode

 ksql> insert into a_table values('1','1', 'timeA');
 --wait 5 second;
 ksql> insert into b_table values('1','1', 'timeB');

select * from ab_table emit changes; --return 1 row result

print AB_TABLE from beginning; --return 1 row result

2) use case. insert new data by fast mode

 ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');

  ksql>  print a from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}

   ksql> print b from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}

select * from ab_table emit changes; --return 1 row result

print AB_TABLE from beginning; --return 2 row result

rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}

What is hell? Why in second use case i have two dublicate row in topic ?

Update Info about topic \ table

    name                 : B_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    name                 : A_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    Name                 : AB_TABLE
     Field           | Type                      
    ---------------------------------------------
     ROWTIME         | BIGINT           (system) 
     ROWKEY          | VARCHAR(STRING)  (system) 
     A_TABLE_ROWTIME | BIGINT                    
     A_TABLE_ROWKEY  | VARCHAR(STRING)           
     A_TABLE_R       | VARCHAR(STRING)           
     A_TABLE_TIME    | VARCHAR(STRING)           
     B_TABLE_ROWTIME | BIGINT                    
     B_TABLE_ROWKEY  | VARCHAR(STRING)           
     B_TABLE_R       | VARCHAR(STRING)           
     B_TABLE_TIME    | VARCHAR(STRING)     


topic "a" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

topic "b" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

 topic "AB_TABLE" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

Upvotes: 1

Views: 117

Answers (1)

Andrew Coates
Andrew Coates

Reputation: 1893

Worked out what's going on here. It's to do with buffering.

By default, ksqlDB is buffering the input from the two source table changelogs, i.e. topic a and b. (This buffering can be useful to compact all several messages reporting changes to the same key into a single output).

When firing in updates to both tables at once the buffering means that both tables are populated when the buffering is flushed. As both sides of a table-table join result in an output, both input events match each other, resulting in two outputs to topic AB_TABLE.

PRINT AB_TABLE is correctly showing both of the rows in the changelog.

However, SELECT * FROM AB_TABLE EMIT CHANGES is also buffering the input, and this buffering compacts the two changes to a single output.

Buffering can be controlled via the cache.max.bytes.buffering. For example, you can turn off the buffering with:

-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;

I ran your example again after running the above and there was only a single row in AB_TABLE topic.

One could argue that regardless of any buffering the correct output for the table-table join is only a single row. After all, the first row processed should not find a match and the second should. If you feel strongly about this, then please raise a bug in Github.

Upvotes: 1

Related Questions