Reputation: 1831
So I have a simple aggregation job written in PyFlink SQL API. The job read data from AWS kinesis and output result to Kinesis.
I am curious if I can unit-test my pipeline with say pytest? I am guessing I need mock the source and sink with filesystem connector? but how can I create a local Flink session to run the job inside the pytest ? Do we have best practice recommendation here?
Thanks!
Upvotes: 1
Views: 891
Reputation: 43439
You should take a look at how the tests for PyFlink itself are implemented. It sets up various base classes for implementing table test cases; PyFlinkStreamTableTestCase
might a good place to start. Using this it's possible to write tests like this one that I've copied from here:
def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.execute_insert("sinks").wait()
actual = source_sink_utils.results()
expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
self.assert_equals(actual, expected)
There are many more tests where that one came from.
Upvotes: 2