user3603819
user3603819

Reputation: 777

Stop RabbitMQ-Connection in Spring-Boot

I have a spring-boot application that pulls all the messages from a RabbitMQ-queue and then terminates. I use rabbitTemplate from the package spring-boot-starter-amqp (version 2.4.0), namely receiveAndConvert(). Somehow, I cannot get my application to start and stop again. When the rabbitConnectionFactory is created, it will never stop. According to Google and other stackoverflow-questions, calling stop() or destroy() on the rabbitTemplate should do the job, but that doesn't work. The rabbitTemplate is injected in the constructor.

Here is some code:

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Object msg = getMessage();
while (msg != null) {
    try {
        String name = ((LinkedHashMap) msg).get(propertyName).toString();
        //business logic
        logger.debug("added_" + name);
    } catch (Exception e) {
        logger.error("" + e.getMessage());
    }
    msg = getMessage();
}
rabbitTemplate.stop();
private Object getMessage() {
    try {
        return rabbitTemplate.receiveAndConvert(queueName);
    } catch (Exception e) {
        logger.error("" + e.getMessage());
        return null;
    }
}

So, how do you terminate the connection to RabbitMQ properly?

Thanks for your inquiry.

Upvotes: 0

Views: 3176

Answers (2)

Gary Russell
Gary Russell

Reputation: 174494

You can call resetConnection() on the CachingConnectionFactory to close the connection.

Or close() the application context.

Upvotes: 1

Shawrup
Shawrup

Reputation: 2734

If I were to do it , I would use @RabbitListener to receive the messages and RabbitListenerEndpointRegistry to start and stop the listener. Sample Code is given below

@EnableScheduling
@SpringBootApplication
public class Application implements ApplicationRunner {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    public static final String queueName = "Hello";

    @Bean
    public Queue hello() {
        return new Queue(queueName);
    }

    @Autowired
    private RabbitTemplate template;
    
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        String message = "Hello World!";
        this.template.convertAndSend(queueName, message);
        System.out.println(" [x] Sent '" + message + "'");
    }
    
    @Autowired
    RabbitListenerEndpointRegistry registry;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        registry.getListenerContainer( Application.queueName).start();
        Thread.sleep(10000L);
        registry.getListenerContainer( Application.queueName).stop();
    }
}

@Component
class Receiver {
    @RabbitListener(id= Application.queueName,queues = Application.queueName)
    public void receive(String in) {
        System.out.println(" [x] Received '" + in + "'");
    }
}

Upvotes: 0

Related Questions