Reputation: 3328
I am testing a webclient
that returns a flux and I need to wait for it to initialise properly. Like this
I setup a flux as null
private Flux<Event> events = null;
Then call a webclient to get the Flux from a remote URL
Flux<String> events = getFlux(guid);
The webclient is
WebClient client; // already setup with headers and URL
public Flux<String> getFlux(String guid) {
return client.get()
.uri(Props.getBaseEndpoint() + "?id=" + guid)
.retrieve()
.onStatus(status -> status.value() == 401,clientResponse -> Mono.empty())
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(Props.getTimeout()));
}
The getFlux
method appears to return before the Flux is completely initialised. So I want to wait a couple of seconds for it:
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(isFluxInitialised());
where something like:
public Callable<Boolean> isFluxInitialised() {
return new Callable<Boolean>() {
public Boolean call() throws Exception {
if (events != null)
return true;
return false;
}
};
}
Waiting for the Flux
to be not null still causes a race condition in the test. I can't figure out what to wait for so that the getFlux
has returned an initialised Flux
that can then be subscribed to. The test continues with a subscription to the flux as below but finishes before the test data that's sent to the remote endpoint can arrive in the subscription.
events.subscribe(e -> Logs.Info("event: " + e));
Here's the intellisense
Upvotes: 0
Views: 814
Reputation: 5934
There is no reason to introduce blocking operator blockFirst()
into the flow. Still not sure about the use case but technically you are trying to wait for the first element from the Flux
. The same could be achieved without blocking
AtomicBoolean elementAvailable = new AtomicBoolean();
getFlux()
.doOnNext(rec -> elementAvailable.set(true))
.subscribe();
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(elementAvailable::get);
Upvotes: 1
Reputation: 3328
Thanks to @Alex answer about assembly v subscription time. That's the problem. I actually got the awaitility
to work properly by moving it off waiting for "assembly" time (which I couldn't get to work) and instead to wait for the 1st subscription, by using blockFirst()
as below:
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(isFluxInitialised());
and
public Callable<Boolean> isFluxInitialised() {
return new Callable<Boolean>() {
public Boolean call() throws Exception {
if (events.blockFirst() != null)
return true;
return false;
}
};
}
Upvotes: 0
Reputation: 5934
Not sure I understand the logic of the isFluxInitialised
but looking at the description you could be confused by Assembly vs Subscription time. Also, please note that subscribe
is not synchronous operation and your program could exit before results are available.
I would suggest to start with unit test using StepVerifier
to make sure your flow is correct.
StepVerifier.create(getFlux(...))
.expectNextCount(count)
.verifyComplete();
If you need to wait until Flux
is complete in your logic you can use common pattern using CountDownLatch
. The same can be achieved with Awaitility
if you like.
CountDownLatch completionLatch = new CountDownLatch(1);
getFlux(...)
.doOnComplete(completionLatch::countDown)
.doOnNext(e -> Logs.Info("event: " + e))
.subscribe();
completionLatch.await();
Upvotes: 1