Andrea Lombardo
Andrea Lombardo

Reputation: 103

pyflink configuration error SQL parse fail

I want to be sure that flink works in terms of setting and then try to complicate the usage. In the most simple example I tryed to do this stuff. The input contains

column_a,column_b 1,2

The output exists. In order to download the pyflink version of 1.10 using docker for my app I use the following snippet of code :

 jobmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8088:8088"
    command: jobmanager
    environment:
     - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
    - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
source_ddl = """
                CREATE TABLE MyUserTable (
                        column_a INT PRIMARY KEY,
                        column_b INT,
                        
                        ) WITH (
                          'connector' = 'filesystem',         
                          'path' = 'file:///Users//code/examples/input.csv ', 
                          'format' = 'csv'

                        )"""


#connector for data output/sink
sink_ddl = """
                CREATE TABLE table2 (
                            score INT)
                            WITH (
                                'connector' = 'filesystem',
                                'path' = 'file:///Users//code/examples/output.csv',
                                'format' = 'csv'
                            )"""

#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)

#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")

# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT * FROM MyUserTable")
table_result2.print()

The error generated is the following

  File ".\flink1.py", line 41, in <module>
    source_table = table_env.execute_sql(source_ddl)
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\table\table_environment.py", line 804, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
    return f(*a, **kw)
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\protocol.py", line 326, 
in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered ")" at line 6, column 25.     
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)   
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450) 
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
        at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
        at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
        ... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4835)     
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5209)  
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)      
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)      
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)  
        at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
        ... 15 more

Upvotes: 0

Views: 1510

Answers (1)

Alpha Privativum
Alpha Privativum

Reputation: 21

Remove the comma behind "column_b INT,".

This should work.

Upvotes: 2

Related Questions