Reputation: 777
I have a spring-boot application that pulls all the messages from a RabbitMQ-queue and then terminates. I use rabbitTemplate
from the package spring-boot-starter-amqp
(version 2.4.0), namely receiveAndConvert()
. Somehow, I cannot get my application to start and stop again. When the rabbitConnectionFactory is created, it will never stop.
According to Google and other stackoverflow-questions, calling stop()
or destroy()
on the rabbitTemplate should do the job, but that doesn't work.
The rabbitTemplate is injected in the constructor.
Here is some code:
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Object msg = getMessage();
while (msg != null) {
try {
String name = ((LinkedHashMap) msg).get(propertyName).toString();
//business logic
logger.debug("added_" + name);
} catch (Exception e) {
logger.error("" + e.getMessage());
}
msg = getMessage();
}
rabbitTemplate.stop();
private Object getMessage() {
try {
return rabbitTemplate.receiveAndConvert(queueName);
} catch (Exception e) {
logger.error("" + e.getMessage());
return null;
}
}
So, how do you terminate the connection to RabbitMQ properly?
Thanks for your inquiry.
Upvotes: 0
Views: 3176
Reputation: 174494
You can call resetConnection()
on the CachingConnectionFactory
to close the connection.
Or close()
the application context.
Upvotes: 1
Reputation: 2734
If I were to do it , I would use @RabbitListener
to receive the messages and RabbitListenerEndpointRegistry
to start and stop the listener. Sample Code is given below
@EnableScheduling
@SpringBootApplication
public class Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
public static final String queueName = "Hello";
@Bean
public Queue hello() {
return new Queue(queueName);
}
@Autowired
private RabbitTemplate template;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
System.out.println(" [x] Sent '" + message + "'");
}
@Autowired
RabbitListenerEndpointRegistry registry;
@Override
public void run(ApplicationArguments args) throws Exception {
registry.getListenerContainer( Application.queueName).start();
Thread.sleep(10000L);
registry.getListenerContainer( Application.queueName).stop();
}
}
@Component
class Receiver {
@RabbitListener(id= Application.queueName,queues = Application.queueName)
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
Upvotes: 0