Reputation: 161
I have a remote RabbitMQ server which has some queues I want to listen to. I tried this:
@RabbitListener(queues = "queueName")
public void receive(String message) {
System.out.println(message);
}
But it tried to create a new queue. Result is predictable - access denied.
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
I didn't declare any queue in any other way.
How can I listen to an existing queue on a remote server? Also, is there a way to check if this queue exists? And I saw this line
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
in a tutorial. What does #{queueName.name}
mean?
Logs and the beginning of the stack trace:
2018-08-30 22:10:21.968 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
2018-08-30 22:10:21.991 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Upvotes: 3
Views: 11667
Reputation: 585
Here is an example of how to listen to a specific 'queue' using Spring Integration:
SpringIntegrationConfiguration.java
@Configuration
public class SpringIntegrationConfiguration {
@Value("${rabbitmq.queueName}")
private String queueName;
@Bean
public IntegrationFlow ampqInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName))
.handle(System.out::println)
.get();
}
}
ApplicationConfiguration.java
@Configuration
public class ApplicationConfiguration {
@Value("${rabbitmq.topicExchangeName}")
private String topicExchangeName;
@Value("${rabbitmq.queueName}")
private String queueName;
@Value("${rabbitmq.routingKey}")
private String routingKey;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
}
Application.yml
rabbitmq:
topicExchangeName: spring-boot-exchange
queueName: spring-boot
routingKey: foo.bar.#
Upvotes: 0
Reputation: 174779
Even if you don't have configuration permission on the broker, the queueDeclarePassive
used by the listener is allowed (it checks for the presence of the queue).
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
That just means that the queue doesn't exist.
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
That is used to get the queue name at runtime (when you have permission to create queues).
e.g.
@Bean
public AnonymousQueue autoDeleteQueue2() {
return new AnonymousQueue();
}
Spring will add that queue to the broker with a random, unique name. The listener is then configured with the actual queue name.
Upvotes: 4
Reputation: 3805
Here is an example on how to listen to a queue with rabbitMq :
@Component
public class RabbitConsumer implements MessageListener {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "${queue.topic}", durable = "true"),
exchange = @Exchange(value = "${queue.exchange}", type = ExchangeTypes.FANOUT, durable = "true")
)
)
@Override
public void onMessage(Message message) {
// ...
}
}
And the config (application.yaml) :
queue:
topic: mytopic
exchange: myexchange
In rabbitmq, consumer are associated with exchanges. It allow you to define how the messages must be consumed (are all consumer listen to all message ? Is this enought if only one consumer read the message ? ...)
Upvotes: 2