Sumit Sood
Sumit Sood

Reputation: 485

How to move messages from one queue to another in RabbitMQ

In RabbitMQ,I have a failure queue, in which I have all the failed messages from different Queues. Now I want to give the functionality of 'Retry', so that administrator can again move the failed messages to their respective queue. The idea is something like that:

enter image description here

Above diagram is structure of my failure queue. After click on Retry link, message should move into original queue i.e. queue1, queue2 etc.

Upvotes: 4

Views: 8180

Answers (5)

FluX
FluX

Reputation: 1

Here is a more generic tool for some administrative/supporting tasks, the management-ui is not capable of.

Link: https://github.com/bkrieger1991/rabbitcli

It also allows you to fetch/move/dump messages from queues even with a filter on message-content or message-headers :)

Upvotes: 0

Sandro
Sandro

Reputation: 1061

To requeue a message you can use the receiveAndReply method. The following code will move all messages from the dlq-queue to the queue-queue:

do {
    val movedToQueue = rabbitTemplate.receiveAndReply<String, String>(dlq, { it }, "", queue)
} while (movedToQueue)

In the code example above, dlq is the source queue, { it } is the identity function (you could transform the message here), "" is the default exchange and queue is the destination queue.

Upvotes: 1

Thiago Cavalcanti
Thiago Cavalcanti

Reputation: 523

I also have implemented something like that, so I can move messages from a dlq back to processing. Link: https://github.com/kestraa/rabbit-move-messages

Upvotes: 0

Ankush
Ankush

Reputation: 519

It's not straight forward consume and publish. RabbitMQ is not designed in that way. it takes into consideration that exchange and queue both could be temporary and can be deleted. This is embedded in the channel to close the connection after single publish.

Assumptions: - You have a durable queue and exchange for destination ( to send to) - You have a durable queue for target ( to take from )

Here is the code to do so:

        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.QueueingConsumer;
        import org.apache.commons.lang.StringUtils;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;    

        public object shovelMessage(
                     String exchange,
                     String targetQueue,
                     String destinationQueue,
                     String host,
                     Integer port,
                     String user,
                     String pass,
                     int count) throws IOException, TimeoutException, InterruptedException {

                if(StringUtils.isEmpty(exchange) || StringUtils.isEmpty(targetQueue) || StringUtils.isEmpty(destinationQueue)) {
                    return null;
                }

                CachingConnectionFactory factory = new CachingConnectionFactory();
                factory.setHost(StringUtils.isEmpty(host)?internalHost.split(":")[0]:host);
                factory.setPort(port>0 ? port: Integer.parseInt(internalPort.split(":")[1]));
                factory.setUsername(StringUtils.isEmpty(user)? this.user: user);
                factory.setPassword(StringUtils.isEmpty(pass)? this.pass: pass);
                Channel tgtChannel = null;
                try {
                    org.springframework.amqp.rabbit.connection.Connection connection = factory.createConnection();

                    tgtChannel = connection.createChannel(false);
                    tgtChannel.queueDeclarePassive(targetQueue);

                    QueueingConsumer consumer = new QueueingConsumer(tgtChannel);
                    tgtChannel.basicQos(1);
                    tgtChannel.basicConsume(targetQueue, false, consumer);

                    for (int i = 0; i < count; i++) {
                        QueueingConsumer.Delivery msg = consumer.nextDelivery(500);
                        if(msg == null) {
        // if no message found, break from the loop.
                            break;
                        }
                        //Send it to destination Queue
                        // This repetition is required as channel looses the connection with 
                        //queue after single publish and start throwing queue or exchange not 
                        //found connection.
                        Channel destChannel = connection.createChannel(false);
                        try {
                            destChannel.queueDeclarePassive(destinationQueue);
    SerializerMessageConverter serializerMessageConverter = new SerializerMessageConverter();
     Message message = new Message(msg.getBody(), new MessageProperties());
                          Object o = serializerMessageConverter.fromMessage(message);
// for some reason msg.getBody() writes byte array which is read as a byte array // on the consumer end due to which this double conversion.
                            destChannel.basicPublish(exchange, destinationQueue, null, serializerMessageConverter.toMessage(o, new MessageProperties()).getBody());
                            tgtChannel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
                        } catch (Exception ex) {
                            // Send Nack if not able to publish so that retry is attempted
                            tgtChannel.basicNack(msg.getEnvelope().getDeliveryTag(), true, true);
                            log.error("Exception while producing message ", ex);
                        } finally {
                            try {
                                destChannel.close();
                            } catch (Exception e) {
                                log.error("Exception while closing destination channel ", e);
                            }

                        }
                    }

                } catch (Exception ex) {
                    log.error("Exception while creating consumer ", ex);
                } finally {
                    try {
                        tgtChannel.close();
                    } catch (Exception e) {
                        log.error("Exception while closing destination channel ", e);
                    }
                }

                return null;

            }

Upvotes: 1

Arpan Gupta
Arpan Gupta

Reputation: 98

If you are looking for a Java code to do this, then you have to simply consume the messages you want to move and publish those messages to the required queue. Just look up on the Tutorials page of rabbitmq if you are unfamiliar with basic consuming and publishing operations.

Upvotes: 2

Related Questions