Reputation: 13892
I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction
, and it has 2 inputs.
I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection
for each input, as I won't control the order in which data points are injected in each input.
I've tried to create a single input using StreamExecutionEnvironment#fromCollection
and dispatch each element to the actual input of my CoFlatMapFunction
based on their type, but the order of elements is lost in this operation.
Is there another way to write this test?
Upvotes: 2
Views: 714
Reputation: 43439
The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:
You'll need these dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.
Upvotes: 1
Reputation: 9245
You want to use the TwoInputStreamOperatorTestHarness
class. Unfortunately documentation is a little sparse. I've got a test that uses this class, but it's not yet pushed to the 133_stream-test-harness branch of flink-crawler.
Upvotes: 1