Reputation: 465
Does anyone know how to test windowing functions in Flink
? I am using the dependency flink-test-utils_2.11
.
My steps are:
StreamExecutionEnvironment
keyBy
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
The problem is that result.getAllAccumulatorResults()
size is 0.
Any ideas what I am doing wrong? Thanks in advance!
Upvotes: 0
Views: 2635
Reputation: 921
Probably the right approach here is to use a TestHarness
. A good example is the WindowOperatorTest
in the Flink project itself.
Furthermore, you can checkout https://github.com/knaufk/flink-testing-pyramid for examples how to test Flink Job on different levels of the testing pyramid and the Flink documentation on testing https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.
Upvotes: 1
Reputation: 43439
Windows don't put their results into accumulators. You should attach a test sink to your job and then compare that sink's contents to what you expect. Something like what's shown in the documentation in the section on integration testing.
Upvotes: 1