Reputation: 862
I'm executing a SQL in Flink looks like this:
create table team_config_source (
`payload` ROW(
`before` ROW(
team_config_id int,
...
),
`after` ROW(
team_config_id int,
...
)
),
PRIMARY KEY (`payload`.`after`.`team_config_id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'key.format' = 'json'
)
But Flink give me this error:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 51, column 29.
Was expecting one of:
")" ...
"," ...
I've also tried to replace (`payload`.`after`.`team_config_id`)
with (`payload.after.team_config_id`)
, but Flink will say that column payload.after.team_config_id was not defined
.
How should I correct my DDL?
Upvotes: 1
Views: 833