Karis Njiru
Karis Njiru

Reputation: 41

Adding Dynamically created queues to a listener RabbitMq Spring

I am creating a Web Application for events-planners to better manage their events. Each event they create needs a queue which means the app needs to create a queue when an event is created. So far I have been able to create the queues and they appear in the rabbitmq management console but when I try to add a queue to a listener it brings this error: java.lang.NullPointerException: Cannot invoke "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.addQueueNames(String[])" because the return value of "co.ke.mpango.backend.config.webSocket.RabbitQueueServiceImpl.getMessageListenerContainerById Here is the code:

@Service @Log4j2 public class RabbitQueueServiceImpl implements RabbitQueueService { @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @Override public void addNewQueue(String queueName, String exchangeName, String routingKey) { Queue queue = new Queue(queueName, true, false, false); Binding binding = new Binding( queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null ); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareBinding(binding); this.addQueueToListener(exchangeName,queueName); } @Override public void addQueueToListener(String listenerId, String queueName) { log.info("adding queue : " + queueName + " to listener with id : " + listenerId); if (!checkQueueExistOnListener(listenerId,queueName)) { this.getMessageListenerContainerById(listenerId).addQueueNames(queueName); log.info("queue "); } else { log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId); } } @Override public void removeQueueFromListener(String listenerId, String queueName) { log.info("removing queue : " + queueName + " from listener : " + listenerId); if (checkQueueExistOnListener(listenerId,queueName)) { this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName); log.info("deleting queue from rabbit management"); this.rabbitAdmin.deleteQueue(queueName); } else { log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId); } } @Override public Boolean checkQueueExistOnListener(String listenerId, String queueName) { try { log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId); log.info("getting queueNames"); String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames(); if (queueNames != null) { log.info("checking " + queueName + " exist on active queues"); for (String name : queueNames) { log.info("name : " + name + " with checking name : " + queueName); if (name.equals(queueName)) { log.info("queue name exist on listener, returning true"); return Boolean.TRUE; } } return Boolean.FALSE; } else { log.info("there is no queue exist on listener"); return Boolean.FALSE; } } catch (Exception e) { log.error("Error on checking queue exist on listener"); return Boolean.FALSE; } } private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) { log.info("getting message listener container by id : " + listenerId); return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry .getListenerContainer(listenerId) ); } }

Added: How could then one listener be defined?

Upvotes: 4

Views: 712

Answers (1)

saver
saver

Reputation: 2684

RabbitListenerEndpointRegistry class is storage for all listener containers(SimpleRabbitListenerContainer, DirectRabbitListenerContainer) and for managing their life cycle. Before taking MessageListenerContainer from RabbitListenerEndpointRegistry you need to register this container inside the registry like this:

SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(LISTENER_CONTAINER_ID);
endpoint.setMessageListener(message -> {
        RabbitMQMessage<QueueItem> rabbitMQMessage = (RabbitMQMessage<QueueItem>) SerializationUtils.deserialize(message.getBody());
        rabbitMQService.onMessage(rabbitMQMessage); //method where your message will be processed
});

//listenerContainerFactory is instance of DirectRabbitListenerContainerFactory or SimpleRabbitListenerContainerFactory
registry.registerListenerContainer(endpoint, listenerContainerFactory, true);

//start the registry if it is created manually
//registry.start();

and then you can take that container from registry by LISTENER_CONTAINER_ID

registry.getListenerContainer(LISTENER_CONTAINER_ID);

To manage dynamic queues it is better to use DirectMessageListenerContainer, this container will open a separate channel for each queue without restarting the container like SimpleRabbitListenerContainer does after adding a new queue and it will take some time until SimpleRabbitListenerContainer will be ready to receive messages from the queues.

See more differences here: https://docs.spring.io/spring-amqp/reference/html/#choose-container

Upvotes: 1

Related Questions