Reputation: 41
I have defined a stream
CREATE STREAM QUOTE (quoteId VARCHAR,
counterPartyId VARCHAR)
WITH (KAFKA_TOPIC='quotes',
VALUE_FORMAT='JSON',
KEY='quoteId');
I want to aggregate how many quotes I got so far , and the last quoteId on that event
CREATE TABLE KQUOTE AS
SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
FROM QUOTE
GROUP BY 1;
Turn this Table to Stream cause I want to know the aggregation result history. (Seem like I have to use underlying topic to create stream. Can't directly create stream from table 'KQUOTE').
CREATE stream KQuoteStream (quoteId VARCHAR,
count INT)
WITH (KAFKA_TOPIC='KQUOTE',
VALUE_FORMAT='JSON',
KEY='quoteId');
I expect above use RAWKEY quoteId, but it's not. As we can see below the RAWKEY is always 1(since we group by constant 1 when creating table kquote).
ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
Try to re partition the stream by quoteId to change RAWKEY to be quoteId
CREATE stream KQuoteStreamByQuoteId
as
SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;
RAMKEY is still constant 1
ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
BTW: All topic has same partition as 1 to make things simpler. Anyone has some idea? Thanks a lot !
Upvotes: 4
Views: 1504
Reputation: 741
That's definitely an interesting bug you've surfaced!
The trick here is to understand that the WITH(KEY='quoteId')
doesn't actually do anything, it's a hint to ksqlDB that the key field happens to also exist in the value as quoteId
. Then, when you PARTITION BY quoteId
, it thinks that you are partitioning by the rowkey so it does nothing! I agree that this behavior is pretty unintuitive, which is why we are planning to remove WITH(KEY=...)
functionality in favor of something more intuitive (to be determined).
In the meantime, the workaround should be to not specify the key when you create KQuoteStream
so that KSQL doesn't optimize away the repartition.
Upvotes: 2