Reputation: 21
CREATE TABLE user_log (
a STRING,
b STRING
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true',
'format.fail-on-missing-field' = 'false'
);
The correct format is {"a":1,"b":2} , but kafka sent a wrong data : AABB , The program will stop . How to filter out wrong format JSON in SQL?
Upvotes: 1
Views: 932
Reputation: 43612
In Flink 1.11 (about to be released) these format options have been added (and both default to false). See FLINK-17663.
'json.fail-on-missing-field' = false,
'json.ignore-parse-errors' = false,
I'm not sure what you're meant to do in earlier releases.
Upvotes: 3
Reputation: 3874
When defining the sc configuration for the TABLE You can set something like:
'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing;
Which should do exactly as You want.
Upvotes: 0