Dmytro
Dmytro

Reputation: 2009

Reactive Spring Cloud Stream with RabbitMQ binder: Manual ACK

So I have a reactive Consumer and RabbitMQ as a Binder implementation, it basically looks like this:

    @Override
    public void accept(Flux<Message<Event>> eventMessages) {
        eventMessages
                .buffer(Duration.of(5, ChronoUnit.SECONDS))
                .flatMap(messages -> ... )
                .buffer(Duration.of(60, ChronoUnit.SECONDS))
                .flatMap(messages -> ...)
                .doOnNext(this::acknowledgeMessage)
                .subscribe();
    }


    private void acknowledgeMessage(Message<Event> message) {
        var channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
        var deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

        channel.basicAck(deliveryTag, false);

        log.info("Message acknowledged");
    }

The thing is that I only need to acknowledge a message when all of the operations finish successfully but I'm pretty sure channel.basicAck call is blocking. Is there a reactive alternative to this? And just in case -- how will that blocking call affect the overall performance and stuff? Thanks!

Upvotes: 0

Views: 391

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

basicAck can block (but usually doesn't, as long as you are using different connections for publishers and consumers, which is recommended).

You could use a .publishOn() before the doOnNext() to hand it off to another thread.

Upvotes: 1

Related Questions