Marcos Schroh
Marcos Schroh

Reputation: 465

How to properly test a Flink window function?

Does anyone know how to test windowing functions in Flink? I am using the dependency flink-test-utils_2.11.

My steps are:

  1. Get the StreamExecutionEnvironment
  2. Create objects and add to the invironment
  3. Do a keyBy
  4. add a Session Window
  5. execute an aggregate function
public class AggregateVariantCEVTest extends AbstractTestBase {

   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());


       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());

   }
}

The problem is that result.getAllAccumulatorResults() size is 0.

Any ideas what I am doing wrong? Thanks in advance!

Upvotes: 0

Views: 2635

Answers (2)

snntrable
snntrable

Reputation: 921

Probably the right approach here is to use a TestHarness. A good example is the WindowOperatorTest in the Flink project itself.

Furthermore, you can checkout https://github.com/knaufk/flink-testing-pyramid for examples how to test Flink Job on different levels of the testing pyramid and the Flink documentation on testing https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.

Upvotes: 1

David Anderson
David Anderson

Reputation: 43439

Windows don't put their results into accumulators. You should attach a test sink to your job and then compare that sink's contents to what you expect. Something like what's shown in the documentation in the section on integration testing.

Upvotes: 1

Related Questions