Reputation: 8859
I have some topic data with the fields stringA
stringB
and I am just trying to use that as the key when creating a KSQL table from a topic.
Upvotes: 1
Views: 4621
Reputation: 51
Just an update to @Robin Moffat.. Use the below
CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY STRINGA + STRINGB ;
instead of
CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY MY_COMPOSITE_KEY ;
NOTE: column ordering matters Worked for me! (CLI v0.10.1, Server v0.10.1)
Upvotes: 3
Reputation: 32110
Here's an example. First, I'll create and populate a test stream
ksql> CREATE STREAM TEST (STRINGA VARCHAR,
STRINGB VARCHAR,
COL3 INT)
WITH (KAFKA_TOPIC='TEST',
PARTITIONS=1,
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
+--------------+--------+---------+----------+------+
|ROWTIME |ROWKEY |STRINGA |STRINGB |COL3 |
+--------------+--------+---------+----------+------+
|1578569329184 |null |A |B |1 |
|1578569331653 |null |A |B |2 |
|1578569339177 |null |C |D |3 |
Note that ROWKEY
is null.
Now I'll create a new stream, populated from the first, and create the composite column set it as the key. I'm also including the original fields themselves, but this is optional if you don't need them:
ksql> CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY MY_COMPOSITE_KEY ;
Message
------------------------------------------------------------------------------------------
Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
------------------------------------------------------------------------------------------
Now you have a stream of data with the key set to your composite key:
ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
+---------+-------+
|ROWKEY |COL3 |
+---------+-------+
|AB |1 |
|AB |2 |
|CD |3 |
Limit Reached
Query terminated
You can also inspect the underlying Kafka topic to verify the key:
ksql> PRINT TEST_REKEY LIMIT 3;
Format:JSON
{"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
{"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
{"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
ksql>
With this done, we can now declare a table on top of the re-keyed topic:
CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY,
COL3 INT)
WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');
From this table we can query the state. Note that the composite key AB
only shows the latest value, which is part of the semantics of a table (compare to the stream above, in which you see both values - both stream and table are the same Kafka topic):
ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
+----------------+---------+------+
|ROWTIME |ROWKEY |COL3 |
+----------------+---------+------+
|1578569331653 |AB |2 |
|1578569339177 |CD |3 |
Upvotes: 2