Reputation: 110
I'm trying to make a Java service using Spring Boot that connects to a Rabbit exchange, discover new queues (that matches with a given prefix) and connect to them. I'm using RabbitManagementTemplate
to discover and SimpleMessageListenerContainer
to create a bind. It works fine.
The problem is that when one of these dynamic queues gets deleted (by the web interface for example), my service can't handle the exception and I didn't find a place to register some handler to fix this. For these cases I just want to ignore the deletion and move on, I'm not willing to recreate the queue.
My code is something like
@Scheduled(fixedDelay = 3*1000)
public void watchNewQueues() {
for (Queue queue : rabbitManagementTemplate.getQueues()) {
final String queueName = queue.getName();
String[] nameParts = queueName.split("\\.");
if ("dynamic-queue".equals(nameParts[0]) && !context.containsBean(queueName)) {
logger.info("New queue discovered! Binding to {}", queueName);
Binding binding = BindingBuilder.bind(queue).to(exchange).with("testroute.#");
rabbitAdmin.declareBinding(binding);
rabbitAdmin.declareQueue(queue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MyMessageListener());
container.setPrefetchCount(settings.getPrefetch());
container.setAutoDeclare(false);
container.setMissingQueuesFatal(true);
container.setDeclarationRetries(0);
container.setFailedDeclarationRetryInterval(-1);
context.getBeanFactory().registerSingleton(queueName, container);
container.start();
}
}
}
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
if (event.getSource() instanceof SimpleMessageListenerContainer) {
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
if (context.getAutowireCapableBeanFactory() instanceof BeanDefinitionRegistry) {
logger.info("Removing bean! {}", container.getQueueNames()[0]);
((BeanDefinitionRegistry)context.getAutowireCapableBeanFactory()).removeBeanDefinition(container.getQueueNames()[0]);
} else {
logger.info("Context is not able to remove bean");
}
} else {
logger.info("Got event but is not a SimpleMessageListenerContainer {}", event.toString());
}
}
And when the queue gets deleted, console logs:
2018-03-13 15:01:29.623 WARN 32736 [pool-1-thread-6] --- o.s.a.r.listener.BlockingQueueConsumer : Cancel received for amq.ctag-wKQUQkUNOSCtjQ9RBUNCig; Consumer: tags=[{amq.ctag-wKQUQkUNOSCtjQ9RBUNCig=dynamic-queue.some-test}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,3), conn: Proxy@23510c77 Shared Rabbit Connection: SimpleConnection@66c17803 [delegate=amqp://guest@localhost:5672/], acknowledgeMode=AUTO local queue size=0
2018-03-13 15:01:30.219 WARN 32736 [SimpleAsyncTaskExecutor-1] --- o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
2018-03-13 15:01:30.219 INFO 32736 [SimpleAsyncTaskExecutor-1] --- o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,3), conn: Proxy@23510c77 Shared Rabbit Connection: SimpleConnection@66c17803 [delegate=amqp://guest@localhost:5672/], acknowledgeMode=AUTO local queue size=0
2018-03-13 15:01:30.243 WARN 32736 [SimpleAsyncTaskExecutor-2] --- o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:dynamic-queue.some-test
2018-03-13 15:01:30.246 WARN 32736 [SimpleAsyncTaskExecutor-2] --- o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[dynamic-queue.some-test]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:835)
at com.sun.proxy.$Proxy63.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dynamic-queue.some-test' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dynamic-queue.some-test' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
... 1 common frames omitted
Thanks for your attention
EDIT: Thanks! I was able to avoid the recreation of the queue. I'm now struggling to remove the queue from the Spring Context :)
Upvotes: 1
Views: 2270
Reputation: 11
The easiest way is to use ApplicationListener for MissingQueueEvent
@Component
public class MissingQueueListener implements ApplicationListener<MissingQueueEvent> {
private static final Logger logger = LoggerFactory.getLogger(MissingQueueListener.class);
@Override
public void onApplicationEvent(MissingQueueEvent missingQueueEvent) {
((SimpleMessageListenerContainer) missingQueueEvent.getSource()).removeQueueNames(missingQueueEvent.getQueue());
logger.error("Removing missing queue {} from its container", missingQueueEvent.getQueue());
}
}
Upvotes: 1
Reputation: 174554
You'll get error logs, of course, but with container.setMissingQueuesFatal(true);
(the default), the container will stop itself after 3 attempts to declare the queue at 5 second intervals.
You can affect the time it takes to stop by setting the declarationRetries
(default 3) and failedDeclarationRetryInterval
(default 5000).
Upvotes: 3