pcb
pcb

Reputation: 63

KsqlDB Converting from TextNode to Array

TL;DR: In ksqlDB, is there a way to convert a TextNode into an Array<VARCHAR> so an EXPLODE can be performed without error?

Brand new to ksqlDB and running into an odd issue. I'm ETLing off debezium -> ksqldb and the data is flowing which is great. The problem is, when I go to use EXPLODE function it is failing to parse since what I want to be an ARRAY is actually a TEXTNODE. Here is a simplified datastructure coming off Postgres where data is a JSONB inside postgres:

{
  "id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
  "data": [
        "d728fef0-9eec-4dec-b9b6-04b5444431f6",
        "7a475d25-ec73-41c3-9fbc-0a62e96d887a"
  ]
}

My debezium connector is using KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

Setup and outputs(topic output which has extra fields that I ignore):

SET 'auto.offset.reset' = 'earliest';

CREATE SOURCE CONNECTOR forms_reader WITH (
    'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname' = 'db',
    'database.port' = '5432',
    'database.user' = 'master',
    'database.password' = 'secret',
    'database.dbname' = 'forms',
    'database.server.name' = 'forms',
    'table.whitelist' = 'public.response_version',
    'transforms' = 'unwrap',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.unwrap.drop.tombstones' = 'false',
    'transforms.unwrap.delete.handling.mode' = 'rewrite'
);

print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}

Create streams:

CREATE STREAM response_versions(
    id VARCHAR KEY, 
    data ARRAY<String>)
    WITH(kafka_topic='forms.public.response_version', value_format='JSON');

CREATE STREAM response_fields
    WITH(value_format='JSON')
    AS SELECT id AS rv_id,
           EXPLODE(data) AS data_field
        FROM response_versions EMIT CHANGES;

Error in logs on second stream creation:

org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA

Anyone have a solution to this? Ideally I'd like to get the TextNode into an Array but i'm not sure how.

Thanks, Patrick

Upvotes: 0

Views: 1409

Answers (2)

lkaupp
lkaupp

Reputation: 551

Just in case someone is stumbling over here, to do the same with avro schema and want to parse a textnode to an avro ARRAY. Here an example with BOOLEAN, replace it with the type you need:

cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>)

because it is possible to use CAST to cast an array to an array. Here an build in query example. Create a stream and convert string boolean arrays to boolean arrays. I have dtype in my stream which tell the datatype, but it may also include non-arrays, so i use instr, to get only the arrays into the transformation:

create stream array_stream as SELECT rowkey, date, nodeid, dtype, cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>) as value_transformed FROM stream_defined_on_topic where instr(value, ']')>0 and dtype='Boolean';

Now it is possible to create a table on top of the defined stream with:

 create table array_stream_table as select nodeid, collect_list(dtype), collect_list(array_join(value_transformed)) from array_stream window tumbling (size 1 seconds) group by nodeid;

Hopefully this clarifies the answer above also for AVRO schema. Took me a while to figure it out.

Upvotes: 0

Sergio
Sergio

Reputation: 81

The error is caused by the response_versions stream which cannot read the string into an array. To do the conversion, you'll have to create that stream to read a string, and then convert the string to an array.

For conversion str -> arry, I had to remove the brackets with regexp_replace, then split the string to an array with regexp_split_to_array.

CREATE STREAM response_versions(
    id VARCHAR KEY, 
    data STRING)
    WITH(kafka_topic='forms.public.response_version', value_format='JSON');

CREATE STREAM response_fields
    WITH(value_format='JSON')
    AS SELECT 
       id AS rv_id, 
       EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\\[|\\]', ''), '\\s*,\\s*')) as AS data_field
FROM response_versions EMIT CHANGES;  

Upvotes: 2

Related Questions