Ruslan Akhundov
Ruslan Akhundov

Reputation: 2216

Vertx Rx-Java: reasons for eventBus subscriber being unsubscribed

I am using vertx with rx-java.

I have a verticle which subscribes to events on eventBus with specific addresses:

eventBus.localConsumer(some_addres)
        .toObservable()
        .subscribe(message -> {
                     ...
                     message.reply(...);
                   })
... same for other addresses...

And other verticles are sending events using:

eventBus.rxSend(some_address, message, new DeployOptions().setSendTimeout(60000));

Verticles are created via RxHelper.deployVerticle.

And everything worked fine, however after some time one of the addresses got unsubscribed, and all requests for this event now fail with ReplyException: No handlers for address some_ddress error, all other addresss are still subscribed.

I don't see any vertx errors in the logs.

What can be the reason for consumer being unsubscribed automatically from particaller address it was listening too?

As far as I understand: if the request fails with error or timeout it shouldn't lead to unsubscription, so I don't really understand what can lead to such behaviour then.(I don't have any explicit unsubscribe calls at all)

Upvotes: 2

Views: 323

Answers (1)

Ruslan Akhundov
Ruslan Akhundov

Reputation: 2216

It appeared that the prblem was that the code before message.reply sometimes threw an exception:

eventBus.localConsumer(some_addres)
    .toObservable()
    .subscribe(message -> {
                 ... <-- exception here
                 message.reply(...);
               })

Simple fix:

    eventBus.localConsumer(some_addres)
    .toObservable()
    .subscribe(message -> {
                 try {
                     ... <-- exception here
                     message.reply(...);
                 } catch (Exception e) {
                     ...handle exception...
                    message.error(...);
                 }
               })

Upvotes: 1

Related Questions