Walf F
Walf F

Reputation: 13

ksqlDB: Joining tables/streams with nested structure(not flattened)

I got two different topics with their own schema in avro format, X and Y. Both of these topics have a lot of fields. I want to create a table-stream join relation between them and output it to another topic in the following format:

{
   id, // the id used to join them
   x_name : X,
   y_name: Y
}

With other words, I want to join these two with each source nested. I am able to join them in a normal way, however all fields are flattened out. Can this be achieved with KsqlDB? I've tried to find a good way of doing this without success.

EDIT:

Adding more information and example. Say I have two topics with this type of data.

product_supply

{
  "product_id": 1,
  "name": "name",
  "stock": 11
  "price": "141",
  "storage_ids": [1, 2, 3]
}

product_information

{
  "product_id": 1,
  "description": "151",
  "manufacturer": "ABC"
  "Vendor_id": "5"
}

I'd like to use KsqlDB to join these tables in a unflattened manner and publish to a topic, like this:

{
  "product_id": 1,
  "product_information": {
      "product_id": 1,
      "description": "151",
      "manufacturer": "ABC"
      "Vendor_id": "5"
  }
  "product_supply": {
      "product_id": 1,
      "name": "name",
      "stock": 11
      "price": "141",
      "storage_ids": [1, 2, 3]
  }
}

I've added schema for each topic and wish if possible, to use the schemas without having to explicitly define each field in ksql.

Upvotes: 1

Views: 1593

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32130

There's a good guide here on working with structured data in ksqlDB. Based on this I was able to get this to work:

  • Create sample data

    CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]);
    INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
    
  • Query the data

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT PS.PRODUCT_ID AS PRODUCT_ID,
          STRUCT(NAME        := PS.NAME,
                  STOCK       := PS.STOCK,
                  PRICE       := PS.PRICE,
                  STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY,
          STRUCT(DESCRIPTION  := PI.DESCRIPTION,
                  MANUFACTURER := PI.MANUFACTURER,
                  VENDOR_ID    := PI.VENDOR_ID) AS PRODUCT_INFORMATION
      FROM PRODUCT_SUPPLY PS
          LEFT JOIN PRODUCT_INFORMATION PI
          ON PS.PRODUCT_ID=PI.PRODUCT_ID
    EMIT CHANGES LIMIT 1;
    
    +-------------------------+-------------------------+-------------------------+
    |PRODUCT_ID               |PRODUCT_SUPPLY           |PRODUCT_INFORMATION      |
    +-------------------------+-------------------------+-------------------------+
    |1                        |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC|
    |                         |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5}  |
    |                         |, 3]}                    |                         |
    Limit Reached
    Query terminated
    

Upvotes: 1

Related Questions