Reputation: 2216
I am using vertx with rx-java.
I have a verticle which subscribes to events on eventBus with specific addresses:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
...
message.reply(...);
})
... same for other addresses...
And other verticles are sending events using:
eventBus.rxSend(some_address, message, new DeployOptions().setSendTimeout(60000));
Verticles are created via RxHelper.deployVerticle
.
And everything worked fine, however after some time one of the addresses got unsubscribed, and all requests for this event now fail with ReplyException: No handlers for address some_ddress
error, all other addresss are still subscribed.
I don't see any vertx errors in the logs.
What can be the reason for consumer being unsubscribed automatically from particaller address it was listening too?
As far as I understand: if the request fails with error or timeout it shouldn't lead to unsubscription, so I don't really understand what can lead to such behaviour then.(I don't have any explicit unsubscribe
calls at all)
Upvotes: 2
Views: 323
Reputation: 2216
It appeared that the prblem was that the code before message.reply
sometimes threw an exception:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
... <-- exception here
message.reply(...);
})
Simple fix:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
try {
... <-- exception here
message.reply(...);
} catch (Exception e) {
...handle exception...
message.error(...);
}
})
Upvotes: 1