Reputation: 2009
So I have a reactive Consumer
and RabbitMQ as a Binder implementation, it basically looks like this:
@Override
public void accept(Flux<Message<Event>> eventMessages) {
eventMessages
.buffer(Duration.of(5, ChronoUnit.SECONDS))
.flatMap(messages -> ... )
.buffer(Duration.of(60, ChronoUnit.SECONDS))
.flatMap(messages -> ...)
.doOnNext(this::acknowledgeMessage)
.subscribe();
}
private void acknowledgeMessage(Message<Event> message) {
var channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
var deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
channel.basicAck(deliveryTag, false);
log.info("Message acknowledged");
}
The thing is that I only need to acknowledge a message when all of the operations finish successfully but I'm pretty sure channel.basicAck
call is blocking.
Is there a reactive alternative to this? And just in case -- how will that blocking call affect the overall performance and stuff? Thanks!
Upvotes: 0
Views: 391
Reputation: 174484
basicAck
can block (but usually doesn't, as long as you are using different connections for publishers and consumers, which is recommended).
You could use a .publishOn()
before the doOnNext()
to hand it off to another thread.
Upvotes: 1