Reputation: 25
I am sending message from Kafka to Flink in Python. I have 2 different json roots in one Kafka topic. My json roots with examples:
1- {'Message1': {'b': 'c'}}
2- {'Message2': {'e': 'f'}}
Flink can consume these messages but can not parse for DDL format.
CREATE TABLE audienceInput (
`messageKey` VARBINARY,
`message` VARBINARY,
`topic` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'mytopic',
'properties.bootstrap.servers' = '****:9092',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json'
)
"""
How can I parse two root types of messages in Flink DDL?
Upvotes: 0
Views: 130
Reputation: 7208
You can directly declare the two Message1
and Message2
fields in the table schema.
CREATE TABLE audience_input (
Message1 ROW(b STRING, c STRING),
Message2 ROW(e STRING, f STRING),
...
) WITH
...
'value.format' = 'json'
)
They will then appear as columns with nested fields in the resulting column.
See the Json format doc for more details about the json-format.
Upvotes: 1