Reputation: 103
I want to be sure that flink works in terms of setting and then try to complicate the usage. In the most simple example I tryed to do this stuff. The input contains
column_a,column_b 1,2
The output exists. In order to download the pyflink version of 1.10 using docker for my app I use the following snippet of code :
jobmanager:
image: pyflink/playgrounds:1.10.0
volumes:
- ./examples:/opt/examples
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8088:8088"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: pyflink/playgrounds:1.10.0
volumes:
- ./examples:/opt/examples
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os
# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
source_ddl = """
CREATE TABLE MyUserTable (
column_a INT PRIMARY KEY,
column_b INT,
) WITH (
'connector' = 'filesystem',
'path' = 'file:///Users//code/examples/input.csv ',
'format' = 'csv'
)"""
#connector for data output/sink
sink_ddl = """
CREATE TABLE table2 (
score INT)
WITH (
'connector' = 'filesystem',
'path' = 'file:///Users//code/examples/output.csv',
'format' = 'csv'
)"""
#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)
#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")
# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT * FROM MyUserTable")
table_result2.print()
The error generated is the following
File ".\flink1.py", line 41, in <module>
source_table = table_env.execute_sql(source_ddl)
File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\table\table_environment.py", line 804, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__
return_value = get_return_value(
File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
return f(*a, **kw)
File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\protocol.py", line 326,
in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4835)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5209)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 15 more
Upvotes: 0
Views: 1510