Reputation: 13
this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink) i think i have imported all of the necessary libraries
And I'm getting this error when running the code
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
Upvotes: 1
Views: 689
Reputation: 221
There are many problems with your program, e.g.
'connector' = 'kafka'
, extra comma after ``t TIMESTAMP_LTZ(3),
and 'format' = 'json',
create_temporary_function
to register Python UDFs instead of execute_sql
SELECT
clause is not consistent with the sink table output
definitionI have made some modifications to it as following:
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()
Upvotes: 1