Reputation: 93
My understanding of KSQL tables is that they show an "as is" view of our data rather than all the data. So if I have a simple aggregating query and I SELECT from my table, I should see the data as it is at this point in time.
My data (stream):
MY_TOPIC_STREAM:
15 | BEACH | Steven Ebb | over there
24 | CIRCUS | John Doe | an adress
30 | CIRCUS | Alice Small | another address
35 | CIRCUS | Barry Share | a home
35 | CIRCUS | Garry Share | a home
40 | CIRCUS | John Mee | somewhere
45 | CIRCUS | David Three | a place
45 | CIRCUS | Mary Three | a place
45 | CIRCUS | Joffrey Three | a place
My table definition:
CREATE TABLE MY_TABLE WITH (VALUE_FORMAT='AVRO') AS SELECT ROWKEY AS APPLICATION, COUNT(*) AS NUM_APPLICANTS FROM MY_TOPIC_STREAM WHERE header->eventType = 'CIRCUS' GROUP BY ROWKEY;
I am confused as to why I see multiple rows in my table even though the eventual aggregates are correct?
SELECT * FROM MY_TABLE; APPLICATION NUM_APPLICANTS 24 1 30 1 --> 35 1 <-- why do I see this? 35 2 40 1 --> 45 1 <-- why do I see this? --> 45 2 <-- why do I see this? 45 3
My sink topic also shows me the same as the table output - presumably this is correct?
I expected my table result to be:
APPLICATION NUM_APPLICANTS 24 1 30 1 35 2 40 1 45 3
Outputs abridged for brevity and readability above, but you get the gist.
So - are my expectations of the table and sink topic outputs off the mark?
UPDATE
Matthias answer below explains correctly that the table and sink topic show changelog events so it is normal to see intermediate values. However what was confusing me was that I was seeing all intermediate rows. It turned out that this was because I was using the confluent 5.2.1 docker-compose which sets the environment variable KSQL_STREAMS_CACHE_MAX_BYTES_BUFFERING=0
. This disables caching of all intermediate results in KSQL aggregations and therefore the table shows more rows than expected whilst eventually arriving at the correct aggregates. Setting this to e.g. 10MB caused the data to output as expected. This feature is not immediately obvious in the documentation for those starting to play with KSQL and using docker to stand up the instances! This issue pointed me in the right direction, and this page documents the parameters. I had spent a long time on this and could not work out why it was not behaving as expected! I hope this helps someone.
Upvotes: 2
Views: 790
Reputation: 62350
Not sure what version you are using, however, SELECT * FROM MY_TABLE;
does not return the current content of the table, but the table's changelog stream (this holds for older versions; in newer version the query you show is not valid as the syntax was changed).
Since the transition from KSQL to ksqlDB, the query you showed would be called a push query expressed as SELECT * FROM my_table EMIT CHANGES;
.
Furthermore, ksqlDB introduced pull queries that allow you to lookup the current state. However SELECT * FROM my_table;
is not supported as a pull query yet (it will be added in the future). You can only do table lookups for a specific key, i.e., there must be a WHERE
clause at the moment.
Check out the docs for more details: https://docs.ksqldb.io/en/latest/concepts/queries/pull/
Upvotes: 2