Reputation: 191
I have a topic called customers and I have created a stream for it
CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');
My producer for customers
topic is generating a Integer key and a json value. But when I see the row key is being set to a some binary value
ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}
Now if i create a table it results in a single row (maybe because row key is the same??)
CREATE TABLE customers (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');
After searching the web I bumped into this article https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key and created a new stream by repartitioning on the key
CREATE STREAM customers_stream2 AS \
SELECT * FROM customers_stream \
PARTITION BY customerId;
So how do I create a table which has the latest values of customers data?
creating a table from stream is resulting in a error
CREATE TABLE customers_2_table_active AS
SELECT CUSTOMERID,ISACTIVE
FROM customers_stream2;
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
I need the latest value of the various rows so that another microservice can query the new table.
Thank you in advance
Upvotes: 1
Views: 2632
Reputation: 62360
Rekeying seems to be the right approach, however, you cannot convert a STREAM
into a TABLE
directly.
Note, that your rekeyed stream customers_stream2
is written into a corresponding topic. Hence, you should be able to crate a new TABLE
from the stream's topic to get the latest value per key.
Upvotes: 2