Russell You
Russell You

Reputation: 41

Kafka KSQL Re partition and rekey problem

I have defined a stream

CREATE STREAM QUOTE (quoteId VARCHAR,
                      counterPartyId VARCHAR)
        WITH (KAFKA_TOPIC='quotes',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

I want to aggregate how many quotes I got so far , and the last quoteId on that event

CREATE TABLE KQUOTE AS
    SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
        FROM QUOTE
        GROUP BY 1;

Turn this Table to Stream cause I want to know the aggregation result history. (Seem like I have to use underlying topic to create stream. Can't directly create stream from table 'KQUOTE').

CREATE stream KQuoteStream (quoteId VARCHAR,
                      count INT)
        WITH (KAFKA_TOPIC='KQUOTE',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

I expect above use RAWKEY quoteId, but it's not. As we can see below the RAWKEY is always 1(since we group by constant 1 when creating table kquote).

ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

Try to re partition the stream by quoteId to change RAWKEY to be quoteId

CREATE stream KQuoteStreamByQuoteId
        as
    SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;

RAMKEY is still constant 1

ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

BTW: All topic has same partition as 1 to make things simpler. Anyone has some idea? Thanks a lot !

Upvotes: 4

Views: 1504

Answers (1)

Almog
Almog

Reputation: 741

That's definitely an interesting bug you've surfaced!

The trick here is to understand that the WITH(KEY='quoteId') doesn't actually do anything, it's a hint to ksqlDB that the key field happens to also exist in the value as quoteId. Then, when you PARTITION BY quoteId, it thinks that you are partitioning by the rowkey so it does nothing! I agree that this behavior is pretty unintuitive, which is why we are planning to remove WITH(KEY=...) functionality in favor of something more intuitive (to be determined).

In the meantime, the workaround should be to not specify the key when you create KQuoteStream so that KSQL doesn't optimize away the repartition.

Upvotes: 2

Related Questions