Reputation: 1605
Is it possible to run below code in a transaction so if an exception is thrown in the business processing we can roll back the message we sent to the queue?
rabbitTemplate.convertAndSend("queue1", data);
//do some processing
rabbitTemplate.convertAndSend("queue2", data);
Need for this is what if something went wrong after sending message to queue1, but we're not able to send message to queue2. Or what if issue network or some other issue in sending message to queue.
Upvotes: 8
Views: 16182
Reputation: 174484
If this code is running on a listener container thread (onMessage()
or @RabbitListener
) and the container and template both have setChannelTransacted(true)
then the publishing (and delivery) will run in the same transaction; throwing an exception will cause everything to be rolled-back.
If this is in some arbitrary java class (not running on a container thread), then you need to start the transaction before the method runs...
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
Here's a full Spring Boot application that demonstrates the feature...
@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
Foo foo = context.getBean(Foo.class);
try {
foo.send("foo");
}
catch (Exception e) {}
foo.send("bar");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// should not get any foos...
System.out.println(template.receiveAndConvert("foo", 10_000));
System.out.println(template.receiveAndConvert("bar", 10_000));
// should be null
System.out.println(template.receiveAndConvert("foo", 0));
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
admin.deleteQueue("foo");
admin.deleteQueue("bar");
context.close();
}
@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
public Queue bar() {
return new Queue("bar");
}
@Bean
public Foo fooBean() {
return new Foo();
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public static class Foo {
@Autowired
private RabbitTemplate template;
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
}
}
EDIT
Transactions on the consumer side; this does not generally apply when using Spring because it manages the transaction, but when using the client directly...
Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));
getChannel().txRollback(); // delivery won't be requeued; remains unacked
if (envelope.isRedeliver()) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
getChannel().txCommit(); // commit the ack so the message is removed
getChannel().basicCancel(consumerTag);
latch.countDown();
}
else { // first time, let's requeue
getChannel().basicReject(envelope.getDeliveryTag(), true);
getChannel().txCommit(); // commit the reject so the message will be requeued
}
}
});
latch.await();
channel.close();
connection.close();
Note the the txRollback
does nothing in this case; only the ack (or reject) are transactional.
Upvotes: 10