user3444718
user3444718

Reputation: 1605

RabbitMQ sending message in transaction

Is it possible to run below code in a transaction so if an exception is thrown in the business processing we can roll back the message we sent to the queue?

rabbitTemplate.convertAndSend("queue1", data);

//do some processing

rabbitTemplate.convertAndSend("queue2", data);

Need for this is what if something went wrong after sending message to queue1, but we're not able to send message to queue2. Or what if issue network or some other issue in sending message to queue.

Upvotes: 8

Views: 16182

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

If this code is running on a listener container thread (onMessage() or @RabbitListener) and the container and template both have setChannelTransacted(true) then the publishing (and delivery) will run in the same transaction; throwing an exception will cause everything to be rolled-back.

If this is in some arbitrary java class (not running on a container thread), then you need to start the transaction before the method runs...

    @Transactional
    public void send(String in) {
        this.template.convertAndSend("foo", in);
        if (in.equals("foo")) {
            throw new RuntimeException("test");
        }
        this.template.convertAndSend("bar", in);
    }

Here's a full Spring Boot application that demonstrates the feature...

@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
        Foo foo = context.getBean(Foo.class);
        try {
            foo.send("foo");
        }
        catch (Exception e) {}
        foo.send("bar");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        // should not get any foos...
        System.out.println(template.receiveAndConvert("foo", 10_000));
        System.out.println(template.receiveAndConvert("bar", 10_000));
        // should be null
        System.out.println(template.receiveAndConvert("foo", 0));
        RabbitAdmin admin = context.getBean(RabbitAdmin.class);
        admin.deleteQueue("foo");
        admin.deleteQueue("bar");
        context.close();
    }

    @Bean
    public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public Queue foo() {
        return new Queue("foo");
    }

    @Bean
    public Queue bar() {
        return new Queue("bar");
    }

    @Bean
    public Foo fooBean() {
        return new Foo();
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    public static class Foo {

        @Autowired
        private RabbitTemplate template;

        @Transactional
        public void send(String in) {
            this.template.convertAndSend("foo", in);
            if (in.equals("foo")) {
                throw new RuntimeException("test");
            }
            this.template.convertAndSend("bar", in);
        }

    }

}

EDIT

Transactions on the consumer side; this does not generally apply when using Spring because it manages the transaction, but when using the client directly...

Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
            byte[] body) throws IOException {
        System.out.println(new String(body));

        getChannel().txRollback(); // delivery won't be requeued; remains unacked

        if (envelope.isRedeliver()) {
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            getChannel().txCommit(); // commit the ack so the message is removed
            getChannel().basicCancel(consumerTag);
            latch.countDown();
        }
        else { // first time, let's requeue
            getChannel().basicReject(envelope.getDeliveryTag(), true);
            getChannel().txCommit(); // commit the reject so the message will be requeued
        }
    }

});
latch.await();
channel.close();
connection.close();

Note the the txRollback does nothing in this case; only the ack (or reject) are transactional.

Upvotes: 10

Related Questions