Reputation: 464
I have configured RabbitMQ cluster and I use Spring Cloud Stream over RabbitMQ with such a config:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface FileChangedSink {
String INPUT = "fileChanged";
@Input(INPUT)
SubscribableChannel fileChanged();
}
And application.yml
spring:
cloud:
stream:
bindings:
fileChanged:
destination: file.changed
binder: stream_rabbit
consumer:
max-attempts: 1
binders:
stream_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: ${RABBITMQ_HOST}
port: ${RABBITMQ_NODE_PORT_NUMBER}
username: ${RABBITMQ_DEFAULT_USER}
password: ${RABBITMQ_DEFAULT_PASS}
virtual-host: ${RABBITMQ_DEFAULT_VHOST}
Versions:
Spring Boot Version: 1.5.10.RELEASE
spring-cloud-starter-stream: 1.3.2.RELEASE
spring-cloud-starter-stream-rabbit: 1.3.3.RELEASE
Usually this config works ok, but recently I got such an exception with it:
2019-10-28T08:11:51.832Z 2019-10-28 08:11:51.831 WARN [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.835Z 2019-10-28 08:11:56.835 ERROR [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal=false exception on startup
2019-10-28T08:11:56.835Z org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:626)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1472)
2019-10-28T08:11:56.835Z at java.base/java.lang.Thread.run(Thread.java:844)
2019-10-28T08:11:56.835Z Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:718)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:594)
2019-10-28T08:11:56.835Z ... 2 common frames omitted
2019-10-28T08:11:56.835Z Caused by: java.io.IOException: null
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50)
2019-10-28T08:11:56.835Z at jdk.internal.reflect.GeneratedMethodAccessor1279.invoke(Unknown Source)
2019-10-28T08:11:56.835Z at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2019-10-28T08:11:56.835Z at java.base/java.lang.reflect.Method.invoke(Method.java:564)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980)
2019-10-28T08:11:56.835Z at com.sun.proxy.$Proxy228.queueDeclarePassive(Unknown Source)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:697)
2019-10-28T08:11:56.835Z ... 3 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
2019-10-28T08:11:56.835Z at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
2019-10-28T08:11:56.835Z ... 11 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
2019-10-28T08:11:56.836Z ... 1 common frames omitted
2019-10-28T08:11:56.836Z 2019-10-28 08:11:56.835 INFO [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@2fe6f292: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,10419), conn: Proxy@2f143ea Shared Rabbit Connection: SimpleConnection@1daf2e3e [delegate=amqp://[email protected]:5672/, localPort= 52338], acknowledgeMode=AUTO local queue size=0
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848 WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848 WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
2019-10-28T08:11:56.848Z org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]
Note: This error is random and usually my config works ok.
From the error I can understand that queue wasn't created before stream tries to connect to it.
So my question is how it could happen and how I can prevent such a situation?
Upvotes: 0
Views: 1148
Reputation: 174504
Boot 1.5.x is no longer supported; the last version 1.5.22 was released in August.
You need to set the queue argument x-queue-master-locator
to client-local
to ensure that an anonymous queue is created on the node that the application is connected to.
Starting with spring-amqp version 2.1 (used by Spring Boot 2.1 and binder 2.2), this is done automatically by the framework.
With earlier versions, you can set the master locator for your anonymous queues using a policy on the broker.
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/245
EDIT
Screen shot of policy:
Upvotes: 2