0x26res
0x26res

Reputation: 13892

Integration test for complex topology (multiple inputs) in Flink

I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction, and it has 2 inputs.

I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection for each input, as I won't control the order in which data points are injected in each input.

I've tried to create a single input using StreamExecutionEnvironment#fromCollection and dispatch each element to the actual input of my CoFlatMapFunction based on their type, but the order of elements is lost in this operation.

Is there another way to write this test?

Upvotes: 2

Views: 714

Answers (2)

David Anderson
David Anderson

Reputation: 43439

The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

You'll need these dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.

Upvotes: 1

kkrugler
kkrugler

Reputation: 9245

You want to use the TwoInputStreamOperatorTestHarness class. Unfortunately documentation is a little sparse. I've got a test that uses this class, but it's not yet pushed to the 133_stream-test-harness branch of flink-crawler.

Upvotes: 1

Related Questions