Aram Panasenco
Aram Panasenco

Reputation: 108

Processing a multi-stage request in ksqlDB

Suppose there's a stream of multi-stage requests for documents with given IDs to be retrieved from source, contrast-adjusted, OCRed, word-counted, placed in a destination directory, etc. (the exact stages can vary from request to request). The request stages have to be done in order and intermediate results in the cache should be reused (don't want to keep OCRing the same document over and over). I'd like to create a stream of requests for the individual applications (document retrieval app, OCR app, etc.). How can I do that with ksqlDB?

Upvotes: 0

Views: 55

Answers (1)

Aram Panasenco
Aram Panasenco

Reputation: 108

Let's represent both requests and products as arrays of steps it takes to create them. In the worst case, assume each step can contain custom information, so then the representation should be of the type ARRAY<MAP<STRING,STRING>>. Using an array as a key is not currently supported in ksqlDB, but we can 'cheat' by using the array cast as string as the key.

Let's break down the logic first: - Any new request should trigger the requests of any missing prerequisites. - Any new request that matches an existing product should be silently ignored. - Any new request without a matching product but with a matching prerequisite product should be forwarded to the application tasks stream. - Any new request with neither a matching product nor a matching prerequisite product should trigger the creation of a new request for the prerequisite product. - Any new product that is the prerequisite of a request should trigger the request to be forwarded to the application tasks stream.

First, let's put together our request streams and tables.

CREATE STREAM requests (stages ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='requests', value_format='json', partitions=1);

CREATE STREAM keyed_requests WITH (kafka_topic='keyed_requests', value_format='json', partitions=1) AS
SELECT
  CAST(stages AS STRING) AS request_id,
  CAST(SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS STRING) AS prereq_id,
  stages,
  SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS prereq
FROM requests
PARTITION BY CAST(stages AS STRING);

CREATE TABLE tbl_requests (ROWKEY STRING PRIMARY KEY, request_id STRING, prereq_id STRING, stages ARRAY<MAP<STRING,STRING>>, prereq ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='keyed_requests', value_format='json');

CREATE TABLE tbl_requests_copy AS SELECT * FROM tbl_requests;

CREATE TABLE prereq_requests AS SELECT prereq_id, COLLECT_SET(request_id) AS requests FROM keyed_requests GROUP BY prereq_id;

INSERT INTO requests
SELECT
  keyed_requests.prereq AS stages
FROM keyed_requests
LEFT JOIN tbl_requests_copy ON keyed_requests.prereq_id = tbl_requests_copy.ROWKEY
WHERE tbl_requests_copy.ROWKEY IS NULL;

Next, let's create the stream/table of completed products:

CREATE STREAM products (ROWKEY STRING KEY) WITH (kafka_topic='products', value_format='json', partitions=1);
CREATE TABLE tbl_products (ROWKEY STRING PRIMARY KEY) WITH (kafka_topic='products', value_format='json');

Now, let's create a new stream for requests with completed prerequisites:

CREATE STREAM completed_prereq_requests (ROWKEY STRING KEY, request_id STRING) WITH (kafka_topic='completed_prereq_requests', value_format='json', partitions=1);

Let's insert new requests into the stream:

INSERT INTO completed_prereq_requests
SELECT
  request_id
FROM keyed_requests
LEFT JOIN tbl_products ON keyed_requests.prereq_id = tbl_products.ROWKEY
WHERE keyed_requests.prereq_id = '[]' OR tbl_products.ROWKEY IS NOT NULL
EMIT CHANGES;

Let's also insert completed products into the stream:

INSERT INTO completed_prereq_requests
SELECT
  EXPLODE(prereq_requests.requests) AS request_id
FROM products
INNER JOIN prereq_requests ON products.ROWKEY = prereq_requests.ROWKEY
EMIT CHANGES;

Now we have a stream of IDs of requests with completed prerequisites, but it still includes requests that were already completed themselves. In addition, the request arrays themselves are not in this stream. Let's filter out completed requests and enrich with array objects in a new stream:

CREATE STREAM tasks AS
SELECT
  tbl_requests.stages AS stages
FROM completed_prereq_requests
LEFT JOIN tbl_requests ON completed_prereq_requests.request_id = tbl_requests.ROWKEY
LEFT JOIN tbl_products ON completed_prereq_requests.request_id = tbl_products.ROWKEY
WHERE tbl_products.ROWKEY IS NULL
PARTITION BY completed_prereq_requests.request_id;

To test this out, select the tasks stream in one ksqlDB window with SELECT * FROM tasks EMIT CHANGES;. Then open another ksqlDB window and create a request and start producing products. Watch what happens on the other screen.

INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p':='x')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y2')]);
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z2')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y')] AS STRING));

Upvotes: 1

Related Questions