Reputation: 12404
I'm trying to make a reactive RabbitMQ listener that allows us to process each message with multiple subscribers. We only want to ack
the message when all subscribers finish successfully.
This is my current setup:
Observable
.fromCallable(() -> {
// Set up connection
return consumer;
})
.flatMap(consumer -> Observable
.fromCallable(consumer::nextDelivery)
.doOnError(throwable -> {
try {
consumer.getChannel().getConnection().close();
} catch (IOException ignored) { }
})
.repeat())
.retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
.publish()
.refCount();
This will set up the connection once, share all messages with all subscribers, and reconnect after 3 seconds if this fails anywhere due to e.g. Rabbit becoming unavailable.
What I still need to do is ack
or nack
the message. Since all our message handlers are idempotent, I can just requeue the message if any handler happens to fail, to ensure every handler finishes successfully.
Is there any way to tell if any subscriber failed? I was currently thinking about subscribing something like this:
public void subscribe(Action1 action) {
deliveries
.flatMap(delivery -> Observable
.just(delivery)
.doOnNext(action)
.doOnError(throwable -> {
// nack
})
.doOnCompleted(() -> {
// ack
})
)
.subscribe();
}
But this obviously ack
s or nack
s on the first failure or success. Is there any way to merge
all subscribers of a particular message and then check for errors or completion?
I also tried something like using an AtomicInteger
to count all subscribers and then count the successes / failures but obviously whenever someone subscribes or unsubscribes during processing there's no trivial way of syncing that without blocking the entire processing step.
I could also give every subscriber an Observable<Delivery>
and make them return that with an error or a completion, similar to retryWhen
(as sort of a reply channel), but I have no way of generating the required number of observables upfront and merging them afterwards.
Any ideas? Thanks for reading!
Upvotes: 1
Views: 1375
Reputation: 16152
You want to use Observable.mergeDelayError
, and then one of the .onError*
methods.
The first one will propagate the error only after all observables have completed/errored out; the second will allow you to handle the error after processing has finished.
Edit: To get the counts, count the successes:
Object message = ...;
List<Action1<?>> actions = ...;
Observable.from(actions)
.map(action ->
Observable.defer(() -> Observable.just(action.call(message))
.ignoreEmements()
.cast(Integer.class)
.switchIfEmpty(1)
.onErrorReturnResumeNext(e->Observable.empty())
)
.compose(Observable::merge)
.count();
It's a bit convoluted and it could be made clearer: make the calls, ignore errors from them, count successes.
Upvotes: 1
Reputation: 13481
You can use onErrorResumeNext to control the exception propagated from your pipeline and set the nack, and then use your onComplete as ack
Here an example
Observable.just(null).map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world!");
})
.subscribe(s -> System.out.println(s));
Upvotes: 2