dotSlashLeo
dotSlashLeo

Reputation: 21

Error "Invalid join condition: table-table joins require to join on the primary key of the right input table" on joining two tables on Kafka ksqlDB

I need to create a Kafka topic from a combination a nine other topics, all of them produced by Debezium PostgreSQL source connector, in AVRO format. To start, I'm trying (so far unsuccessfully) to combine fields from only two topics.

So, first a create a ksqlDB table based on "REQUEST" topic:

ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');

And everything seems fine to me:

ksql> DESCRIBE TB_REQUEST;

Name                 : TB_REQUEST
 Field       | Type
-----------------------------------------------------------------------------------------------------------------------

 ID          | STRUCT<REQUEST_ID BIGINT> (primary key)

 BEFORE      | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>

 AFTER       | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

Then I create another table from "EMPLOYEE" topic:

ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

Again, everything seems ok.

ksql> DESCRIBE TB_EMPLOYEE;

Name                 : TB_EMPLOYEE
 Field       | Type                                                       
-----------------------------------------------------------------------------------------------------------------------
 ID          | STRUCT<EMPLOYEE_ID INTEGER> (primary key)

 BEFORE      | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 AFTER       | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

But by trying to create my target table joining previous ones by Employee Id.

ksql> CREATE TABLE REQUEST_EMPLOYEE AS 
         SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
         FROM TB_REQUEST RQ
         JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;

I got the following error:

Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
  RQ.ID->REQUEST_ID REQUEST_ID,
  RQ.AFTER->REQUESTER_ID REQUESTER_ID,
  RQ.AFTER->STATUS_ID STATUS_ID,
  EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
  EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;

Looking at output from "DESCRIBE TB_EMPLOYEE" command it looks like to me that "EM.ID->EMPLOYEE_ID" is the right choice. What am I missing?

Thanks in advance.

PS: ksqlDB version in 0.21.0

Upvotes: 2

Views: 2396

Answers (1)

Felipe Tapia
Felipe Tapia

Reputation: 369

I think you should use at least one row key in your join statement, in previous versions of KsqlDB the only way to join Tables was by rowkeys, in your current version 0.21.0 it is possible using a foreign key.

Check the following example:

CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;

Where u_id is defined as primary key thus is the rowkey.

CREATE TABLE users (
    u_id VARCHAR PRIMARY KEY
    name VARCHAR
  ) WITH (
    kafka_topic = 'users',
    partitions = 3,
    value_format = 'json'
  );

The below sentence is similar

CREATE TABLE orders_with_users AS
    SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;

Another observation, KsqlDB is considering the key for your TB_EMPLOYE as STRUCT<EMPLOYEE_ID INTEGER>, Not only Integer. then is waiting for one comparison between structs. (With the same schema)

Then you can perform the follow steps before to create your table.

CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

CREATE STREAM STREAM_REKEY_EMPLOYEE 
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;


CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
         WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');

And use the employee_id field to join, try to use your primary keys as primitive types.

Upvotes: 5

Related Questions