Reputation: 61
I am new to PyFlink. I have done the official training exercise in Java: https://github.com/apache/flink-training
However, the project I am working on must use Python as a programming language. I want to know if it is possible to write a data generator using the "SourceFunction". In older PyFlink versions this was possible, using Jython: https://nightlies.apache.org/flink/flink-docs-release-1.7/dev/stream/python.html#streaming-program-example
In newer examples the dataframe contains a finite set of data, which is never extended. I have not found any example of a data generator in PyFlink, e.g. https://github.com/apache/flink-training/blob/master/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
I am not sure which functionality the interfaces Source and SinkFunction provide. Can it be used somehow in python or can it only be used in combination with other pipelines or jar files? It looks like the methods "run()" and "cancel()" are not implemented and thus it cannot be used like some other classes, by overloading.
If it can not be used in Python, are there any other ways to use it? Someone may provide an easy example.
If it is not possible to use it, are there any other ways to write a data generator in OOP style? Take this example: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/datastream_tutorial/ There the split() method is used to separate the stream. Basically, I want to do this by an extra class and just extending the stream, which was done in the Java TaxiRide example via "ctx.collect()". I am trying to avoid using Java, another framework for the pipeline, and Jython. It would be nice to get a short example code, but I appreciate any tips and advice.
I tried to use SourceFunction directly, but as already mentioned, I think this is a completely wrong way, resulting in an error: AttributeError: 'DataGenerator' object has no attribute '_get_object_id'
class DataGenerator(SourceFunction):
def __init__(self):
super().__init__(self)
self._num_iters = 1000
self._running = True
def run(self, ctx):
counter = 0
while self._running and counter < self._num_iters:
ctx.collect('Hello World')
counter += 1
def cancel(self):
self._running = False
Upvotes: 3
Views: 1277
Reputation: 61
Solution: After looking in some older code using the classes Source and SinkFunction, I came to a solution. Here a kafka connector written in Java is used. The python code can be taken as an example of how to use pyflink's Source and SinkFuntion.
I have only written an example for the SourceFunction:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import SourceFunction
from pyflink.java_gateway import get_gateway
class TaxiRideGenerator(SourceFunction):
def __init__(self):
java_src_class = get_gateway().jvm.org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
java_src_obj = java_src_class()
super(TaxiRideGenerator, self).__init__(java_src_obj)
def show(ds, env):
# this is just a little helper to show the output of the pipeline
ds.print()
env.execute()
def streaming():
# arm the flink ExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
taxi_src = TaxiRideGenerator()
ds = env.add_source(taxi_src)
show(ds, env)
if __name__ == "__main__":
streaming()
The second line in the class init was hard to find. I had expected to get an object in the first line.
You have to create a jar file after building this project. I have entered the path until I see the folder "org":
$ cd flink-training/flink-training/common/build/classes/java/main
flink-training/common/build/classes/java/main$ ls
flink-training/common/build/classes/java/main$ org
flink-training/common/build/classes/java/main$ jar cvf flink-training.jar org/apache/flink/training/exercises/common/**/*.class
Copy the jar file to the pyflink/lib folder, normally under your python environment, e.g. flinkenv/lib/python3.8/site-packages/pyflink/lib. Then start the script.
Upvotes: 3