Liam Zee
Liam Zee

Reputation: 206

Apache Flink 1.14.0 - Unable to use python UDF through SQL DDL in Java

I'm trying to execute the python UDF function in SQL DDL(1.14.0)

Python file here:

from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
    return a + 1

And start flink cluster:

➜  flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.

Java code here:

public class PyUDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //set cfg
        tEnv.getConfig().getConfiguration().setString("python.files",
                "/home/magic/workspace/python/flinkTestUdf/udfTest.py");
        tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
        tEnv.executeSql(
                "CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
        );

        TableResult ret1 = tEnv.executeSql("select add1(3)");
        ret1.print();

        env.execute();
    }
}

And then run the job through Flink client:

flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar

Error is :

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.

But when i use sql-client to execute my py UDF, it runs successfully.

Start sql-client:

PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3  ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py

Then

create temporary system function add1 as 'udfTest.add_one' language python;

Then

select add1(3);

I got the correct result 4 and is there something wrong with my code?

I see that the py UDF function was supported in version 1.11 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL, but now i'm using 1.14.0.

Who can help me out!

Upvotes: 3

Views: 1226

Answers (2)

Metehan Yıldırım
Metehan Yıldırım

Reputation: 401

Make sure that the Pyflink version and Flink version in Java match. For the new commers, the current pom.xml should be

<properties>
        (...)
    <flink.version>1.15.1</flink.version>
</properties>
<dependencies>
    <!-- Apache Flink dependencies -->
    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Add logging framework, to produce console output when running in the IDE. -->
    <!-- These dependencies are excluded from the application JAR by default. -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/net.sf.py4j/py4j -->
    <dependency>
        <groupId>net.sf.py4j</groupId>
        <artifactId>py4j</artifactId>
        <version>0.10.9.5</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <version>${flink.version}</version>
        <type>pom</type>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- added as chunk -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <version>${flink.version}</version>
        <type>pom</type>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-python -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-python_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- added as chunk -->
</dependencies>

Upvotes: 0

Liam Zee
Liam Zee

Reputation: 206

Make sure the dependencies all installed.

Java :

  • 8 or 11

  • maven 3.5+

  • flink jars:

      <dependencies>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-java</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-streaming-java_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-clients_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-common</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-planner_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-python_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
         </dependencies>
    

Python :

  • Python 3.6+
  • Apache Beam(== 2.19.0)
  • pip(>= 7.1.0)
  • setupTools(>= 37.0.0)
  • apache-fink (1.14.0)

Upvotes: 1

Related Questions