Reputation: 206
I'm trying to execute the python UDF function in SQL DDL(1.14.0)
Python file here:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
return a + 1
And start flink cluster:
➜ flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.
Java code here:
public class PyUDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//set cfg
tEnv.getConfig().getConfiguration().setString("python.files",
"/home/magic/workspace/python/flinkTestUdf/udfTest.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.executeSql(
"CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
);
TableResult ret1 = tEnv.executeSql("select add1(3)");
ret1.print();
env.execute();
}
}
And then run the job through Flink client:
flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar
Error is :
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.
But when i use sql-client to execute my py UDF, it runs successfully.
Start sql-client:
PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py
Then
create temporary system function add1 as 'udfTest.add_one' language python;
Then
select add1(3);
I got the correct result 4
and is there something wrong with my code?
I see that the py UDF function was supported in version 1.11
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
, but now i'm using 1.14.0.
Who can help me out!
Upvotes: 3
Views: 1226
Reputation: 401
Make sure that the Pyflink version and Flink version in Java match. For the new commers, the current pom.xml should be
<properties>
(...)
<flink.version>1.15.1</flink.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sf.py4j/py4j -->
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.5</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- added as chunk -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-python -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- added as chunk -->
</dependencies>
Upvotes: 0
Reputation: 206
Make sure the dependencies all installed.
Java :
8 or 11
maven 3.5+
flink jars:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Python :
Upvotes: 1