Ricky Habegger
Ricky Habegger

Reputation: 1

How to guarantee the processing of MQTT messages with Quarkus

I'm making a Java application using Quarkus 3.9.4 with SmallRye connectors and ActiveMQ Artemis 2.33 as the broker. This application receives MQTT messages, processes them, and sends them to an AMQP queue. I need to ensure that there are no messages lost along the way, but I don't mind duplicate messages so I'm using QoS 1 and Clean Session false.

The problem is that even using QoS 1, when my application goes offline for some reason (usually a network problem) I expect the broker to keep the messages in a queue to consume as soon as the application comes back online. This flow seems to work adequately if the application doesn't stay offline for too long and when it comes back it manages to process all the messages in the queue, but if the application receives a lot of messages accumulated in the queue and for some reason stops working again before it finishes processing, the messages that were in the smallrye queue are lost. Is there any way of sending a confirmation that a message has been received only if it has been successfully processed, so that unprocessed messages remain in the queue? Or is there a better way of doing this?

I tried using the @Acknowledgment(Acknowledgment.Strategy.MANUAL) notation and sending the ack only at the end of processing, but even if I didn't send the ack manually, the broker doesn't send the message again.

Upvotes: 0

Views: 193

Answers (0)

Related Questions