kalaiyarasan
kalaiyarasan

Reputation: 3

apache flink job execution using pyflink error

`start reading data from kafka
Using Any for unsupported type: typing.Sequence[~T]
Traceback (most recent call last):
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 104, in <module>
    read_from_kafka()
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 83, in read_from_kafka
    env.execute("Kafka Streaming Job")
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\datastream\stream_execution_environment.py", line 813, in execute
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\util\exceptions.py", line 146, in deco
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o9.execute.
: java.net.MalformedURLException: no protocol: ['file:/C:/flink-1.19.0-bin-scala_2.12/flink-1.19.0/opt/flink-python-1.19.0.jar']

        at java.base/java.net.URL.<init>(URL.java:645)

        at java.base/java.net.URL.<init>(URL.java:541)

        at java.base/java.net.URL.<init>(URL.java:488)

        at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)

        at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)

        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)

        at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)

        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)

        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)

        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.base/java.lang.reflect.Method.invoke(Method.java:566)

        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.base/java.lang.Thread.run(Thread.java:834)


org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
`

i am trying to execute kafka consumer job in flink using pyflink. my code written in python using package pyflink.

how running flink dashboard in window. i did all configuration setup . word count simple python application job executed successfully. But this kafka consume python code only is not executed in flink dashboard.

please help to resolve.

Upvotes: 0

Views: 649

Answers (1)

SouravS
SouravS

Reputation: 96

Short Answer: All you need, is Kafka connector for your version of Flink.

Where do you get it? -> Simple, In Maven Central Repo (https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka)

But wait. You are using Flink 1.19 & kafka connector for this version not yet published as on 23-Mar-2024. (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/)

enter image description here

So, you need to build it using Source Code. And, do you do that? Luckily dependencies are already available. enter image description here

Now use these following steps:

  1. Make Sure you have Java 11 & Maven (i have maven 3.9.6)
  2. Checkout the repo (https://github.com/apache/flink-connector-kafka)
  3. Create kafka connector JAR for flink 1.19 (Refer to below snippet)
  4. The JAR will be generated inside: flink-connector-kafka/target/flink-connector-kafka-3.1-SNAPSHOT.jar
git clone https://github.com/apache/flink-connector-kafka.git
cd flink-connector-kafka
mvn clean package -Dflink.version=1.19.0 -DskipTests

What to do with this JAR now?

  1. Make sure to have apache-flink==1.19.0 pip install apache-flink==1.19.0
  2. Create a table/streaming environment in pyflink
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
s_env = TableEnvironment.create(env_settings)

# FLINK 1.19.0
DEPS_DIR = "/SOME_PATH/lib-flink-1.19"
s_env.get_config().set(
    "pipeline.jars",
    f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().set(
    "pipeline.classpaths",
    f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)

s_env.get_config().get_configuration().to_dict()

Test if that works

s_env.execute_sql('DROP TABLE IF EXISTS t1')

s_env.execute_sql("""
    CREATE TABLE t1 (
      symbol STRING,
      price FLOAT
      ltt timestamp(3),
      WATERMARK FOR ltt as ltt - INTERVAL '1' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.bootstrap.servers' = 'kafka-host:port',
      'properties.group.id' = 'MY_TEST_GROUP',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    )
""")

Upvotes: 0

Related Questions