Reputation: 4218
I'm streaming a few tables from MS SQL Server into Kafka using CDC and the Debezium connector. One of the SQL tables is represented in KSQL as a stream and the rest are KSQL tables that I want to join to the stream to enrich the data, so I have a chain of KSQL streams each joining a new KSQL table to the output of the previous KSQL stream in the chain.
Let A be a KSQL stream.
Let B through D be KSQL tables.
A + B = A'
A' + C = A''
A'' + D = A'''
This works just fine for A' and A'', but A''' is giving me the error: Source table (D) key column (X) is not the column used in the join criteria (Y).
Why am I able to produce streams A' and A'' without incident but A''' gives me this error?
I found some SO questions that indicated I should rekey D using the PARTITION BY clause, but that causes another error: mismatched input 'PARTITION' expecting ';'
. Seems like CREATE TABLE
doesn't play nice with PARTITION BY
, and if I use CREATE STREAM
instead, it tells me I can't create a stream from a table and should use CREATE TABLE
.
HALP!
UPDATE: Obfuscated query added.
CREATE STREAM A_Enriched_Phase7
WITH(PARTITIONS=1)
AS
SELECT *
FROM A_Enriched_Phase6 a
JOIN KsqlTableD d ON a.X = d.X
PARTITION BY a.ID;
Upvotes: 0
Views: 2016
Reputation: 32080
A few things here, and harder to answer without your full set of source SQL statements.
WITH (KEY='col_foo'
syntax to inform KSQL that a field in the schema has the same value as the key. What you are not doing is instructing KSQL to key the messages using the col_foo
field. So let us imagine that your table is defined thus:
ksql> CREATE TABLE KsqlTableD (COL1 VARCHAR, X VARCHAR, Y VARCHAR) WITH (KAFKA_TOPIC='D', VALUE_FORMAT='AVRO', PARTITIONS=1, KEY='Y');
Message
---------------
Table created
---------------
Insert some data:
ksql> INSERT INTO KsqlTableD (COL1, X, Y) VALUES ('A','B','C');
Now try to join on a column that is not the key column of the table:
ksql> SELECT A.*,D.* FROM MYSTREAM A LEFT JOIN KsqlTableD D ON A.COL1=D.X;
Source table (D) key column (D.Y) is not the column used in the join criteria (D.X). Only the table's key column or 'ROWKEY' is supported in the join criteria.
Statement: SELECT A.*,D.* FROM MYSTREAM A LEFT JOIN KsqlTableD D ON A.COL1=D.X;
Caused by: Source table (D) key column (D.Y) is not the column used in the join
criteria (D.X). Only the table's key column or 'ROWKEY' is supported in the join
criteria.
So you need to either join on the key column:
ksql> SELECT A.*,D.* FROM MYSTREAM A LEFT JOIN KsqlTableD D ON A.COL1=D.Y;
1565700762038 | 1 | 1 | FOO | null | null | null | null | null
Or you need to rekey your table data and then join on it:
ksql> CREATE STREAM D WITH (KAFKA_TOPIC='D', VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
ksql>
ksql> CREATE STREAM D_REKEY AS SELECT * FROM D PARTITION BY X;
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE TABLE D_X WITH (KAFKA_TOPIC='D_REKEY', VALUE_FORMAT='AVRO', KEY='X');
Message
---------------
Table created
---------------
ksql> SELECT A.*,D.* FROM MYSTREAM A LEFT JOIN D_X D ON A.COL1=D.X;
1565700762038 | 1 | 1 | FOO | null | null | null | null | null
Upvotes: 1