Alfred
Alfred

Reputation: 1831

PyFlink SQL local test

So I have a simple aggregation job written in PyFlink SQL API. The job read data from AWS kinesis and output result to Kinesis.

I am curious if I can unit-test my pipeline with say pytest? I am guessing I need mock the source and sink with filesystem connector? but how can I create a local Flink session to run the job inside the pytest ? Do we have best practice recommendation here?

Thanks!

Upvotes: 1

Views: 891

Answers (1)

David Anderson
David Anderson

Reputation: 43439

You should take a look at how the tests for PyFlink itself are implemented. It sets up various base classes for implementing table test cases; PyFlinkStreamTableTestCase might a good place to start. Using this it's possible to write tests like this one that I've copied from here:

    def test_sql_query(self):
        t_env = self.t_env
        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
        field_names = ["a", "b", "c"]
        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
        t_env.register_table_sink(
            "sinks",
            source_sink_utils.TestAppendSink(field_names, field_types))

        result = t_env.sql_query("select a + 1, b, c from %s" % source)
        result.execute_insert("sinks").wait()
        actual = source_sink_utils.results()

        expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
        self.assert_equals(actual, expected)

There are many more tests where that one came from.

Upvotes: 2

Related Questions