atkayla
atkayla

Reputation: 8859

How to create KSQL table from a topic with composite key?

I have some topic data with the fields stringA stringB and I am just trying to use that as the key when creating a KSQL table from a topic.

Upvotes: 1

Views: 4621

Answers (2)

Ern Des
Ern Des

Reputation: 51

Just an update to @Robin Moffat.. Use the below

CREATE STREAM TEST_REKEY AS 
        SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
               STRINGA, 
               STRINGB, 
               COL3 
          FROM TEST 
          PARTITION BY STRINGA + STRINGB ;

instead of

CREATE STREAM TEST_REKEY AS 
        SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
               STRINGA, 
               STRINGB, 
               COL3 
          FROM TEST 
          PARTITION BY MY_COMPOSITE_KEY ;

NOTE: column ordering matters Worked for me! (CLI v0.10.1, Server v0.10.1)

Upvotes: 3

Robin Moffatt
Robin Moffatt

Reputation: 32110

Here's an example. First, I'll create and populate a test stream

ksql> CREATE STREAM TEST (STRINGA VARCHAR, 
                          STRINGB VARCHAR, 
                          COL3 INT) 
                    WITH (KAFKA_TOPIC='TEST',
                          PARTITIONS=1,
                          VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
+--------------+--------+---------+----------+------+
|ROWTIME       |ROWKEY  |STRINGA  |STRINGB   |COL3  |
+--------------+--------+---------+----------+------+
|1578569329184 |null    |A        |B         |1     |
|1578569331653 |null    |A        |B         |2     |
|1578569339177 |null    |C        |D         |3     |

Note that ROWKEY is null.

Now I'll create a new stream, populated from the first, and create the composite column set it as the key. I'm also including the original fields themselves, but this is optional if you don't need them:

ksql> CREATE STREAM TEST_REKEY AS 
        SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
               STRINGA, 
               STRINGB, 
               COL3 
          FROM TEST 
          PARTITION BY MY_COMPOSITE_KEY ;

 Message
------------------------------------------------------------------------------------------
 Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
------------------------------------------------------------------------------------------

Now you have a stream of data with the key set to your composite key:

ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
+---------+-------+
|ROWKEY   |COL3   |
+---------+-------+
|AB       |1      |
|AB       |2      |
|CD       |3      |
Limit Reached
Query terminated

You can also inspect the underlying Kafka topic to verify the key:

ksql> PRINT TEST_REKEY LIMIT 3;
Format:JSON
{"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
{"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
{"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
ksql>

With this done, we can now declare a table on top of the re-keyed topic:

CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY, 
                         COL3 INT) 
    WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');

From this table we can query the state. Note that the composite key AB only shows the latest value, which is part of the semantics of a table (compare to the stream above, in which you see both values - both stream and table are the same Kafka topic):

ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
+----------------+---------+------+
|ROWTIME         |ROWKEY   |COL3  |
+----------------+---------+------+
|1578569331653   |AB       |2     |
|1578569339177   |CD       |3     |

Upvotes: 2

Related Questions