user3822232
user3822232

Reputation: 315

Using CTE in apache flink sql

I am trying to write a sql that use CTE in flink.

I have a table defined

CREATE TABLE test_cte
    (
        pod                     VARCHAR,
        PRIMARY KEY (pod) NOT ENFORCED
    ) WITH (
          'connector' = 'upsert-kafka',
          'topic' = 'test_cte',
          'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
          'properties.group.id' = 'test_cte_group_id',
          'value.format' = 'json',
          'key.format' = 'json',
          'properties.allow.auto.create.topics' = 'true',
          'properties.replication.factor' = '3',
          'value.json.timestamp-format.standard' = 'ISO-8601',
          'sink.parallelism' = '3'
          );

then I have insert as

WITH q1 AS ( SELECT pod FROM source ) 
 FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;

I get an error saying org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'FROM' at line 2, column 2.

source tables has the column pod.

When I run just the select like here

WITH q1 AS ( SELECT pod FROM roles_deleted_raw_v1)
select * from q1;

its can see the see the result

Upvotes: 1

Views: 374

Answers (1)

xjmdoo
xjmdoo

Reputation: 1736

CTE is only available in Flink when using Hive dialect:

SET table.sql-dialect = hive;

However, this feature is only supported by the HiveCatalog catalog, so it is not possible to use with upsert-kafka.

For more info on this, you can check out the Flink docs.

Upvotes: 2

Related Questions