Rinze
Rinze

Reputation: 862

Flink failed to deserialize JSON produced by Debezium

I'm trying to use Flink to consume the change event log produced by Debezium. The JSON was this:

{
    "schema":{

    },
    "payload":{
        "before":null,
        "after":{
            "team_config_id":3800,
            "team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
            "team_config_sfdc_account_id":null,
            "team_config_sfdc_account_url":null,
            "team_config_business_type":5,
            "team_config_dpsa_status":0,
            "team_config_desc":null,
            "team_config_company_id":null,
            "team_config_hm_count_stages":null,
            "team_config_assign_credits_times":null,
            "team_config_real_renew_date":null,
            "team_config_action_date":null,
            "team_config_last_action_date":null,
            "team_config_business_tier_notification":"{}",
            "team_config_create_date":1670724933000,
            "team_config_update_date":1670724933000,
            "team_config_rediscovery_tier":0,
            "team_config_rediscovery_tier_notification":"{}",
            "team_config_sfdc_industry":null,
            "team_config_sfdc_market_segment":null,
            "team_config_unterminated_note_id":0
        },
        "source":{

        },
        "op":"c",
        "ts_ms":1670724933149,
        "transaction":null
    }
}

And I've tried two ways to declare the input schema.

The first way was directly parse the JSON data :

create table team_config_source (
      `payload` ROW <
        `after` ROW <
          ...
          team_config_create_date timestamp(3),
          team_config_update_date timestamp(3),
          ...
        >
      >
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'json'
    )

But Flink would throw an error org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date caused by java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0. Doesn't Flink support timestamp in this format?

I've also tried another way, using the built-in debezium format:

create table team_config_source (
      team_config_create_id int,
      ...
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'debezium-json'
    )

But Flink come up with another error java.io.IOException: Corrupt Debezium JSON message caused by java.lang.NullPointerException. I found somebody said that update event shouldn't has null as before value, but this message was a create event.

Could anyone help to check my DDL?

Upvotes: 0

Views: 1459

Answers (2)

Shane Keller
Shane Keller

Reputation: 1

This article might help, it suggests adding 'debezium-json.schema-include' = 'true' to the connector WITH statement.

Upvotes: 0

Duy Nguyen
Duy Nguyen

Reputation: 1005

I am an a Flink expert but TIMESTAMP in Flink is not Epoch time, it is in datetime format.

In this case you can define table like:

team_config_create_bigint BIGINT,
team_config_update_bigint BIGINT,
...
team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))

Upvotes: 1

Related Questions