Zheng
Zheng

Reputation: 21

FlinkSQL:How to filter out wrong format JSON in SQL?

CREATE TABLE user_log (
    a STRING,
    b STRING
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = '',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'format.fail-on-missing-field' = 'false'
);

The correct format is {"a":1,"b":2} , but kafka sent a wrong data : AABB , The program will stop . How to filter out wrong format JSON in SQL?

Upvotes: 1

Views: 932

Answers (2)

David Anderson
David Anderson

Reputation: 43612

In Flink 1.11 (about to be released) these format options have been added (and both default to false). See FLINK-17663.

'json.fail-on-missing-field' = false,
'json.ignore-parse-errors' = false,

I'm not sure what you're meant to do in earlier releases.

Upvotes: 3

Dominik Wosiński
Dominik Wosiński

Reputation: 3874

When defining the sc configuration for the TABLE You can set something like:

'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;

Which should do exactly as You want.

Upvotes: 0

Related Questions