Reputation: 305
{
"event": {
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234", "customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123", "accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"transactionDateTime":"2019-03-26 05:28:13.000",
"transactionDate":"2019-03-26",
"monthAverage":"188",
"dailyAverage":"7"
}
}
}
}
Created stream for the above json:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>,
accountIdentifiers STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
I mm unable to read messages from stream:
select * from STREAM_NAME emit changes;
Any suggestions please?
Upvotes: 0
Views: 256
Reputation: 1893
You're likely running into deserialization errors because the schema of the stream you've created does not match the schema of your data.
Upon reformatting your example data and SQL the error becomes more easily apparent:
$.event.body.customerIdentifiers
element is an array of structs, but the DDL defines it as a struct.$.event.body.accountIdentifiers
element is an array of structs, but the DDL defines it as a struct.DDL that should work is:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers ARRAY<STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>>,
accountIdentifiers ARRAY<STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
Also note the data does not contain the following, which are in the DDL, (though this shouldn't cause any issues, as the fields will just be NULL
):
$.event.header.eventTimeStamp
$.event.body.eventDetails.productDescription
Upvotes: 1