Reputation: 31
In Flip-106
there is an example of how to call a user-defined python function in a batch job java application through SQL Function DDL...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
I've been trying to reproduce this same example in a streaming job java application and this is my code:
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");
fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
Table table = fsTableEnv.fromValues("1", "2", "3").as("str").select("func1(str)");
/* Missing line */
for this particular line in the batch job:
tEnv.toDataSet(table, String.class).collect();
I have not found an equivalent for the streaming job
1. Can you help me out to map this flip-106 example from batch to stream?
Ultimetly what I want is to call out with flink 1.11 a python function in a streamming-job java flink application like this:
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");
fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
System.out.println("Result --> " + table.select($("umid")) + " --> End of Result");
and use the result of that udf for further process (not necessarily print it in the console)
I have edited the test.py
file in order to see if at least regardless of the unnamed table something is being done in python.
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from os import getcwd
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(line):
print(line)
print(getcwd())
with open("test.txt", "a") as myfile:
myfile.write(line)
return line
and nothing is printed, test.txt file is not created and the value is not returned to the streaming job. So basically this python function is not being called.
2. What I'm missing here?
Thanks to David, Wei, and Xingbo for the support so far, because every detail suggested had worked for me.
Best Regards,
Jonathan
Upvotes: 0
Views: 516
Reputation: 11
You can try this:
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");
// You need to specify the python interpreter used to run the python udf on cluster.
// I assume this is a local program so it is the same as the "python.client.executable".
fsTableEnv.getConfig().getConfiguration().setString("python.executable", "/Users/jf/opt/anaconda3/bin/python");
fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
// 'table.select($("umid"))' will not trigger job execution. You need to call the "execute()" method explicitly.
table.execute().print();
Upvotes: 0