Reputation: 459
I am using RabbitMQ as input in a Spring Cloud Stream application. I am trying to bind multiple exchanges to a single queue output, and I'm doing so by listing the exchanges comma-separated (as described in the documentation). This is my application-orders.yml
:
spring:
main:
# added this to avoid
# org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'my.queue.orders.errors.recoverer' defined in null: Cannot register bean definition for bean 'my.queue.orders.errors.recoverer': There is already bound.
allow-bean-definition-overriding: true
rabbitmq:
# username, password, address
cloud:
stream:
bindings:
input:
binder: rabbitmq
destination: mapper1.fanout.replication,mapper2.fanout.replication
group: my.queue.orders
and this is in the base application.yml
:
(spring.cloud.)stream:
default:
content-type: application/json
binders:
rabbit:
type: rabbit
bindings:
input:
binder: ${application.source.binder}
# e.g. rabbit exchange name
destination: ${application.source.destination}
# e.g. rabbit queue name
group: ${application.source.group}
consumer:
max-attempts: 1
# number of threads consuming messages
# may be critical because of ordering of messages
concurrency: 1
...
rabbit:
binder:
nodes: ${application.source.nodes}
bindings:
input:
consumer:
exchangeType: fanout
queueNameGroupOnly: true
exchangeAutoDelete: false
# the following settings will not ACK the message on failure during send to EventHubs
republishToDlq: false
requeueRejected: true
When I send a message from exchange mapper1.fanout.replication
, it works as expected. But if I send the message from the second exchange mapper2.fanout.replication
, I get the following:
2020-11-06 14:32:06.480 INFO 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: my.queue.orders, bound to: mappper2.fanout.replication
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1049/0x0000000800868c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'mapper2.fanout.replication'
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1059/0x000000080086c440 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'my.queue.orders'
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1060/0x000000080086b840 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [my.queue.orders (QUEUE)] to exchange [mapper2.fanout.replication] with routing key []
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : autoBindDLQ=false for: my.queue.orders
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
2020-11-06 14:32:06.499 ERROR 67016 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Only one LastSubscriberMessageHandler is allowed
at org.springframework.cloud.stream.binder.BinderErrorChannel.subscribe(BinderErrorChannel.java:44)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:712)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633)
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:511)
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:129)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407)
... 10 common frames omitted
I have a custom Processor as follow:
interface Processor {
String INPUT = "input";
@Input(Processor.INPUT)
SubscribableChannel input();
//outputs...
}
And a MessageHandler class:
@Profile({ "orders" })
@EnableBinding(Processor.class)
@RequiredArgsConstructor
@Slf4j
public class MessageHandler {
@Autowired
private Processor pipe;
@StreamListener(Processor.INPUT)
public void handleInputMessage(final Message<String> message) {
//do stuff...
}
...
}
I am using Spring Cloud Hoxton.SR8 and spring-cloud-stream-binder-rabbit-3.0.8
Any thoughts?
Upvotes: 1
Views: 1811
Reputation: 174574
Try setting the multiplex
consumer property to true
; you will then get a single binding that consumes from both queues.
Upvotes: 3