Reputation: 792
I'm trying to figure out if Project-Reactor is a viable option for my use case.
I need high consistency in my application, I want to:
With the normal RabbitMQ Java library it would roughly look something like this:
var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.newConnection();
var channel = connection.createChannel();
channel.txSelect();
channel.queueDeclare("queue.A", true, false, false, null);
channel.queueDeclare("queue.B", true, false, false, null);
channel.basicConsume("queue.A", false, (tag, delivery) ->
{
String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received: " + data);
channel.basicPublish("", "queue.B", null, data.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
System.out.println("Sending ACK");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.txCommit();
}, tag -> System.err.println("Tag " + tag + " failed"));
This nicely prints
Received: DATA
Sending ACK
Using Reactor-RabbitMQ I've come up with the following:
var options = new ReceiverOptions();
var receiver = RabbitFlux.createReceiver(options);
var sender = RabbitFlux.createSender();
sender.declare(QueueSpecification.queue("queue.A")).block();
sender.declare(QueueSpecification.queue("queue.B")).block();
sender
.send(receiver.consumeManualAck("queue.A")
.doOnError(ex -> ex.printStackTrace())
.doOnEach(s ->
{
s.get().ack();
print("ACK");
})
.map(ad ->
{
print("MAP");
return new OutboundMessage("", "queue.B", ad.getBody());
}))
.doOnEach(v ->
{
print("SUB");
})
.subscribe();
However, this prints
ACK
MAP
Which is not the right order, obviously, because I'm doing the doOnEach
before map
. But the AcknowledgableDelivery
is no longer available after I've mapped it to OutboundMessage
for the sender.
note that SUB
is never printed. Probably because the send
subscription is continuous and never reaches the Mono<Void>
terminated state.
My knowledge of Reactor is not that extensive, so I feel like I'm missing something I could use here. Is it possible to do this neatly with Reactor or should I forget about it?
Footnote: I'm using var
because I'm in Java 11, and the print statements as a way to quantify the order in which things are executed.
Upvotes: 3
Views: 1259
Reputation: 50
I do not think it is possible to acknowledge inside send method. Instead you can try:
receiver.consumeManualAck("queue.A")
.flatMap(ad -> {
Mono<OutboundMessage> outboundMessageMono =
Mono.just(new OutboundMessage("", "queue.B", ad.getBody()));
return sender.send(outboundMessageMono)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
ad.ack();
} else {
ad.nack(false);
}
});
});
Upvotes: 1