Jonathan Figueroa
Jonathan Figueroa

Reputation: 31

Apache-Flink 1.11 Unable to use Python UDF in SQL Function DDL

According to this confluence page:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL

python udf are in Flink 1.11 available to be used with in SQL functions.

I went to the flink docs here:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

and try this on the terminal and launch the sql-client.sh with the following parameters:

$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py

and then:

> Create Temporary System Function func1 as 'test1.func1' Language PYTHON;
[INFO] Function has been created. 

and when i tried:

> Select func1(str) From (VALUES ("Name1", "Name2", "Name3"));
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'test1.func1' failed.

I have tried to use: -pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles in every single combination .zip, .py and allways the same result.

btw my python file looks like this:

def func1(s):
    return s;

Is there anything I'm missing?

Kind Regards,

Jonathan

Upvotes: 2

Views: 1089

Answers (1)

Wei Zhong
Wei Zhong

Reputation: 11

The python UDF should be wrapped by the "udf" decorator in pyflink.table.udf, like this:

from pyflink.table.types import DataTypes
from pyflink.table.udf import udf

@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a):
    return a + 1

And the flink-python jar need to be loaded when launching the sql-client, like this:

$ cd $FLINK_HOME/bin
$ ./start-cluster.sh
$ ./sql-client.sh embedded -pyfs xxx.py -j ../opt/flink-python_2.11-1.11.0.jar

In addition, you need to add taskmanager.memory.task.off-heap.size: 79mb to $FLINK_HOME/conf/flink-conf.yaml or other files that can be used to set configurations (e.g. the sql client environment file), otherwise you will get an error when executing python udf:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key'taskmanager.memory .task.off-heap.size'.

Best,
Wei

Upvotes: 1

Related Questions