Reputation: 226
I'm working on a new project using flink stateful functions. I've written some basic unit tests using FunctionTestHarness, but a test using this method can't test interaction between stateful functions.
The flink testing documentation (base flink, not for stateful functions) demonstrates how to run a complete job using MiniClusterWithClientResource
and then make assertions on the output of the job. I'm searching for a way to do something similar with stateful functions.
The statefun-flink-harness-example looked very promising, but the RunnerTest
using Harness is marked with @Ignore
since it will never terminate. This is useful for debugging, but can't be used in an automated test.
Here are the problem I've identified so far which make it difficult to write a test which terminates with Harness:
.sendAfter()
I'd think this would be a common need to enable more interesting automated tests which can be run from CI/CD processes. Has anyone found a way to solve the above, or perhaps discovered an entirely different method using tools other than Harness?
Upvotes: 0
Views: 508
Reputation: 491
The Harness also contains a .withFlinkSourceFunction()
method which allows using any Flink SourceFunction
as an ingress.
You can create your own source function that would produce a finite collection of elements, for example:
class FiniteSource<T extends Serializable> implements SourceFunction<T> {
private final List<T> items;
FiniteSource(List<T> items) {
this.items = items;
}
@Override
public void run(SourceContext<T> sourceContext) {
for (T item : items) {
sourceContext.collect(item);
}
}
@Override
public void cancel() {}
}
Then, you can modify the harness example in the following way:
FiniteSource<MyInputMessage> finiteSource = new FiniteSource<>(
Arrays.asList(
new MyInputMessage("user-1", "hello"),
new MyInputMessage("user-2", "world")));
Harness harness =
new Harness()
.withKryoMessageSerializer()
.withFlinkSourceFunction(MyConstants.REQUEST_INGRESS,finiteSource)
.withPrintingEgress(MyConstants.RESULT_EGRESS);
harness.start();
This should terminate after producing two input messages into an ingress. If you believe that this would be a common requirement then I would encourage you to bring this up in the Flink mailing list, and I'm sure that the friendly community there would be happy to accept your feedback and more so a contribution ;)
Has anyone found a way to solve the above, or perhaps discovered an entirely different method using tools other than Harness?
For CI/CD pipelines I would recommend checking out our e2e tests, that are based on test containers. (for example this)
Upvotes: 4