Ammanuel
Ammanuel

Reputation: 13

i'm getting this error when running the below pyflink code

this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink) i think i have imported all of the necessary libraries

And I'm getting this error when running the code

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3),
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json',
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka'
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()

Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work


Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
        ... 13 more ```

Upvotes: 1

Views: 689

Answers (1)

Dian Fu
Dian Fu

Reputation: 221

There are many problems with your program, e.g.

  • Missing comma after 'connector' = 'kafka', extra comma after ``t TIMESTAMP_LTZ(3), and 'format' = 'json',
  • Should use create_temporary_function to register Python UDFs instead of execute_sql
  • The fields order appearing in the SELECT clause is not consistent with the sink table output definition

I have made some modifications to it as following:

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json'
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2

@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500

def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()

Upvotes: 1

Related Questions