Kamiel Wanrooij
Kamiel Wanrooij

Reputation: 12404

Check if any subscriber throws an exception in RxJava

I'm trying to make a reactive RabbitMQ listener that allows us to process each message with multiple subscribers. We only want to ack the message when all subscribers finish successfully.

This is my current setup:

Observable
    .fromCallable(() -> {
        // Set up connection
        return consumer;
    })
    .flatMap(consumer -> Observable
        .fromCallable(consumer::nextDelivery)
        .doOnError(throwable -> {
            try {
                consumer.getChannel().getConnection().close();
            } catch (IOException ignored) { }
        })
        .repeat())
    .retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
    .publish()
    .refCount();

This will set up the connection once, share all messages with all subscribers, and reconnect after 3 seconds if this fails anywhere due to e.g. Rabbit becoming unavailable.

What I still need to do is ack or nack the message. Since all our message handlers are idempotent, I can just requeue the message if any handler happens to fail, to ensure every handler finishes successfully.

Is there any way to tell if any subscriber failed? I was currently thinking about subscribing something like this:

public void subscribe(Action1 action) {
    deliveries
        .flatMap(delivery -> Observable
            .just(delivery)
            .doOnNext(action)
            .doOnError(throwable -> {
                // nack
            })
            .doOnCompleted(() -> {
                // ack
            })
        )
        .subscribe();
}

But this obviously acks or nacks on the first failure or success. Is there any way to merge all subscribers of a particular message and then check for errors or completion?

I also tried something like using an AtomicInteger to count all subscribers and then count the successes / failures but obviously whenever someone subscribes or unsubscribes during processing there's no trivial way of syncing that without blocking the entire processing step.

I could also give every subscriber an Observable<Delivery> and make them return that with an error or a completion, similar to retryWhen (as sort of a reply channel), but I have no way of generating the required number of observables upfront and merging them afterwards.

Any ideas? Thanks for reading!

Upvotes: 1

Views: 1375

Answers (2)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16152

You want to use Observable.mergeDelayError, and then one of the .onError* methods.

The first one will propagate the error only after all observables have completed/errored out; the second will allow you to handle the error after processing has finished.

Edit: To get the counts, count the successes:

Object message = ...;
List<Action1<?>> actions = ...;
Observable.from(actions)
 .map(action ->
    Observable.defer(() -> Observable.just(action.call(message))
    .ignoreEmements()
    .cast(Integer.class)
    .switchIfEmpty(1)
    .onErrorReturnResumeNext(e->Observable.empty())
 )
 .compose(Observable::merge)
 .count();

It's a bit convoluted and it could be made clearer: make the calls, ignore errors from them, count successes.

Upvotes: 1

paul
paul

Reputation: 13481

You can use onErrorResumeNext to control the exception propagated from your pipeline and set the nack, and then use your onComplete as ack

Here an example

Observable.just(null).map(Object::toString)
                     .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                     .retryWhen(errors -> errors.doOnNext(o -> count++)
                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                     .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world!");
                                          })
                     .subscribe(s -> System.out.println(s));

Upvotes: 2

Related Questions