Salocin
Salocin

Reputation: 403

RxJava wait for single Event, RxJava

How do you wait for a single value to come on an observable with a timout?

I am looking for something like:

Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
Optional<Acknowledgement> ack = acknowledgementObservable.getFirst(100, TimeUnit.MILLISECONDS);

Upvotes: 1

Views: 2203

Answers (2)

Salocin
Salocin

Reputation: 403

You can do so by adding .timeout() and .onErrorComplete() .blockingGet()

So in this example it would be:

    Acknowledgement ack = acknowledgementObservable
                              .firstElement()
                              .timeout(3, TimeUnit.SECONDS)
                              .onErrorComplete()
                              .blockingGet();

If the timeout is hit, ack will be null.

Upvotes: 0

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13525

First, convert Observable to CompletableFuture as described in Converting between Completablefuture and Observable:

Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
CompletableFuture<T> future = new CompletableFuture<>();
acknowledgementObservable
    .doOnError(future::completeExceptionally)
    .single()
    .forEach(future::complete);

Then, wait for the event using timeout:

Acknowledgement ack = future.get(100, TimeUnit.MILLISECONDS);

It throws TimeoutException if timeout occurs.

Upvotes: 1

Related Questions