rupweb
rupweb

Reputation: 3328

Wait for a webclient remote flux with awaitility

I am testing a webclient that returns a flux and I need to wait for it to initialise properly. Like this

I setup a flux as null

private Flux<Event> events = null;

Then call a webclient to get the Flux from a remote URL

Flux<String> events = getFlux(guid);

The webclient is

WebClient client; // already setup with headers and URL

public Flux<String> getFlux(String guid) {

    return client.get()
        .uri(Props.getBaseEndpoint() + "?id=" + guid)
        .retrieve()
        .onStatus(status -> status.value() == 401,clientResponse -> Mono.empty())
        .bodyToFlux(String.class)
        .timeout(Duration.ofSeconds(Props.getTimeout()));
}

The getFlux method appears to return before the Flux is completely initialised. So I want to wait a couple of seconds for it:

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(isFluxInitialised());

where something like:

public Callable<Boolean> isFluxInitialised() {
    return new Callable<Boolean>() {
        public Boolean call() throws Exception {
            if (events != null)
                return true;
            
            return false;
        }
    };
}

Waiting for the Flux to be not null still causes a race condition in the test. I can't figure out what to wait for so that the getFlux has returned an initialised Flux that can then be subscribed to. The test continues with a subscription to the flux as below but finishes before the test data that's sent to the remote endpoint can arrive in the subscription.

events.subscribe(e -> Logs.Info("event: " + e));

Here's the intellisense

enter image description here

Upvotes: 0

Views: 814

Answers (3)

Alex
Alex

Reputation: 5934

There is no reason to introduce blocking operator blockFirst() into the flow. Still not sure about the use case but technically you are trying to wait for the first element from the Flux. The same could be achieved without blocking

AtomicBoolean elementAvailable = new AtomicBoolean();

getFlux()
        .doOnNext(rec -> elementAvailable.set(true))
        .subscribe();

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(elementAvailable::get);

Upvotes: 1

rupweb
rupweb

Reputation: 3328

Thanks to @Alex answer about assembly v subscription time. That's the problem. I actually got the awaitility to work properly by moving it off waiting for "assembly" time (which I couldn't get to work) and instead to wait for the 1st subscription, by using blockFirst() as below:

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(isFluxInitialised());

and

public Callable<Boolean> isFluxInitialised() {
    return new Callable<Boolean>() {
        public Boolean call() throws Exception {
            if (events.blockFirst() != null)
                return true;
            
            return false;
        }
    };
}   

Upvotes: 0

Alex
Alex

Reputation: 5934

Not sure I understand the logic of the isFluxInitialised but looking at the description you could be confused by Assembly vs Subscription time. Also, please note that subscribe is not synchronous operation and your program could exit before results are available.

I would suggest to start with unit test using StepVerifier to make sure your flow is correct.

StepVerifier.create(getFlux(...))
    .expectNextCount(count)
    .verifyComplete();

If you need to wait until Flux is complete in your logic you can use common pattern using CountDownLatch. The same can be achieved with Awaitility if you like.

CountDownLatch completionLatch = new CountDownLatch(1);


getFlux(...)
   .doOnComplete(completionLatch::countDown)
   .doOnNext(e -> Logs.Info("event: " + e))
   .subscribe();


completionLatch.await();

Upvotes: 1

Related Questions