Reputation: 63
TL;DR: In ksqlDB, is there a way to convert a TextNode
into an Array<VARCHAR>
so an EXPLODE
can be performed without error?
Brand new to ksqlDB and running into an odd issue. I'm ETLing off debezium -> ksqldb and the data is flowing which is great. The problem is, when I go to use EXPLODE function it is failing to parse since what I want to be an ARRAY is actually a TEXTNODE. Here is a simplified datastructure coming off Postgres where data
is a JSONB inside postgres:
{
"id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
"data": [
"d728fef0-9eec-4dec-b9b6-04b5444431f6",
"7a475d25-ec73-41c3-9fbc-0a62e96d887a"
]
}
My debezium connector is using KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
Setup and outputs(topic output which has extra fields that I ignore):
SET 'auto.offset.reset' = 'earliest';
CREATE SOURCE CONNECTOR forms_reader WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'db',
'database.port' = '5432',
'database.user' = 'master',
'database.password' = 'secret',
'database.dbname' = 'forms',
'database.server.name' = 'forms',
'table.whitelist' = 'public.response_version',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite'
);
print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}
Create streams:
CREATE STREAM response_versions(
id VARCHAR KEY,
data ARRAY<String>)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT id AS rv_id,
EXPLODE(data) AS data_field
FROM response_versions EMIT CHANGES;
Error in logs on second stream creation:
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Anyone have a solution to this? Ideally I'd like to get the TextNode into an Array but i'm not sure how.
Thanks, Patrick
Upvotes: 0
Views: 1409
Reputation: 551
Just in case someone is stumbling over here, to do the same with avro schema and want to parse a textnode to an avro ARRAY. Here an example with BOOLEAN, replace it with the type you need:
cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>)
because it is possible to use CAST to cast an array to an array. Here an build in query example. Create a stream and convert string boolean arrays to boolean arrays. I have dtype in my stream which tell the datatype, but it may also include non-arrays, so i use instr, to get only the arrays into the transformation:
create stream array_stream as SELECT rowkey, date, nodeid, dtype, cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>) as value_transformed FROM stream_defined_on_topic where instr(value, ']')>0 and dtype='Boolean';
Now it is possible to create a table on top of the defined stream with:
create table array_stream_table as select nodeid, collect_list(dtype), collect_list(array_join(value_transformed)) from array_stream window tumbling (size 1 seconds) group by nodeid;
Hopefully this clarifies the answer above also for AVRO schema. Took me a while to figure it out.
Upvotes: 0
Reputation: 81
The error is caused by the response_versions
stream which cannot read the string into an array. To do the conversion, you'll have to create that stream to read a string, and then convert the string to an array.
For conversion str -> arry, I had to remove the brackets with regexp_replace
, then split the string to an array with regexp_split_to_array
.
CREATE STREAM response_versions(
id VARCHAR KEY,
data STRING)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT
id AS rv_id,
EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\\[|\\]', ''), '\\s*,\\s*')) as AS data_field
FROM response_versions EMIT CHANGES;
Upvotes: 2