Fred Fettinger
Fred Fettinger

Reputation: 226

Is it possible to write a unit test which terminates using flink statefun Harness?

I'm working on a new project using flink stateful functions. I've written some basic unit tests using FunctionTestHarness, but a test using this method can't test interaction between stateful functions.

The flink testing documentation (base flink, not for stateful functions) demonstrates how to run a complete job using MiniClusterWithClientResource and then make assertions on the output of the job. I'm searching for a way to do something similar with stateful functions.

The statefun-flink-harness-example looked very promising, but the RunnerTest using Harness is marked with @Ignore since it will never terminate. This is useful for debugging, but can't be used in an automated test.

Here are the problem I've identified so far which make it difficult to write a test which terminates with Harness:

  1. Harness uses SerializableSupplier to provide input, and there's no way for the SerializableSupplier to say that it's done. This means any test using Harness is always waiting for more input.
  2. If Harness was aware that all input had been sent, it would need a way to terminate once there were no pending events.
  3. As an additional complication, some systems would still never terminate due to delayed events sent by Context.sendAfter()

I'd think this would be a common need to enable more interesting automated tests which can be run from CI/CD processes. Has anyone found a way to solve the above, or perhaps discovered an entirely different method using tools other than Harness?

Upvotes: 0

Views: 508

Answers (1)

Igal
Igal

Reputation: 491

The Harness also contains a .withFlinkSourceFunction() method which allows using any Flink SourceFunction as an ingress.

You can create your own source function that would produce a finite collection of elements, for example:

class FiniteSource<T extends Serializable> implements SourceFunction<T> {
    private final List<T> items;

    FiniteSource(List<T> items) {
      this.items = items;
    }

    @Override
    public void run(SourceContext<T> sourceContext) {
      for (T item : items) {
        sourceContext.collect(item);
      }
    }

    @Override
    public void cancel() {}
  }

Then, you can modify the harness example in the following way:

   FiniteSource<MyInputMessage> finiteSource = new FiniteSource<>(
            Arrays.asList(
                    new MyInputMessage("user-1", "hello"),
                    new MyInputMessage("user-2", "world")));

    Harness harness =
        new Harness()
            .withKryoMessageSerializer()
            .withFlinkSourceFunction(MyConstants.REQUEST_INGRESS,finiteSource)
            .withPrintingEgress(MyConstants.RESULT_EGRESS);

    harness.start();

This should terminate after producing two input messages into an ingress. If you believe that this would be a common requirement then I would encourage you to bring this up in the Flink mailing list, and I'm sure that the friendly community there would be happy to accept your feedback and more so a contribution ;)

Has anyone found a way to solve the above, or perhaps discovered an entirely different method using tools other than Harness?

For CI/CD pipelines I would recommend checking out our e2e tests, that are based on test containers. (for example this)

Upvotes: 4

Related Questions