gcpdev
gcpdev

Reputation: 459

Binding multiple RabbitMQ exchanges to single queue throws error in Spring Cloud Stream

I am using RabbitMQ as input in a Spring Cloud Stream application. I am trying to bind multiple exchanges to a single queue output, and I'm doing so by listing the exchanges comma-separated (as described in the documentation). This is my application-orders.yml:

spring:
  main:
    # added this to avoid 
    # org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'my.queue.orders.errors.recoverer' defined in null: Cannot register bean definition for bean 'my.queue.orders.errors.recoverer': There is already  bound.
    allow-bean-definition-overriding: true
  rabbitmq:
    # username, password, address
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitmq
          destination: mapper1.fanout.replication,mapper2.fanout.replication
          group: my.queue.orders

and this is in the base application.yml:

(spring.cloud.)stream:
      default:
        content-type: application/json
      binders:
        rabbit:
          type: rabbit

      bindings:
        input:
          binder: ${application.source.binder}
          # e.g. rabbit exchange name
          destination: ${application.source.destination}
          # e.g. rabbit queue name
          group: ${application.source.group}
          consumer:
            max-attempts: 1
            # number of threads consuming messages
            # may be critical because of ordering of messages
            concurrency: 1
      ...
      rabbit:
        binder:
          nodes: ${application.source.nodes}
        bindings:
          input:
            consumer:
              exchangeType: fanout
              queueNameGroupOnly: true
              exchangeAutoDelete: false
              # the following settings will not ACK the message on failure during send to EventHubs
              republishToDlq: false
              requeueRejected: true

When I send a message from exchange mapper1.fanout.replication, it works as expected. But if I send the message from the second exchange mapper2.fanout.replication, I get the following:

2020-11-06 14:32:06.480  INFO 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: my.queue.orders, bound to: mappper2.fanout.replication
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1049/0x0000000800868c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'mapper2.fanout.replication'
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1059/0x000000080086c440 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'my.queue.orders'
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1060/0x000000080086b840 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://[email protected]:5672/, localPort= 62850]
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [my.queue.orders (QUEUE)] to exchange [mapper2.fanout.replication] with routing key []
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : autoBindDLQ=false for: my.queue.orders
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
2020-11-06 14:32:06.499 ERROR 67016 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Only one LastSubscriberMessageHandler is allowed
    at org.springframework.cloud.stream.binder.BinderErrorChannel.subscribe(BinderErrorChannel.java:44)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:712)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633)
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:511)
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:129)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407)
    ... 10 common frames omitted

I have a custom Processor as follow:

interface Processor {

    String INPUT = "input";

    @Input(Processor.INPUT)
    SubscribableChannel input();

    //outputs...
} 

And a MessageHandler class:

@Profile({ "orders" })
@EnableBinding(Processor.class)
@RequiredArgsConstructor
@Slf4j
public class MessageHandler {

    @Autowired
    private Processor pipe;

    @StreamListener(Processor.INPUT)
    public void handleInputMessage(final Message<String> message) {
      //do stuff...
    }
    ...
}

I am using Spring Cloud Hoxton.SR8 and spring-cloud-stream-binder-rabbit-3.0.8

Any thoughts?

Upvotes: 1

Views: 1811

Answers (1)

Gary Russell
Gary Russell

Reputation: 174574

Try setting the multiplex consumer property to true; you will then get a single binding that consumes from both queues.

https://docs.spring.io/spring-cloud-stream/docs/3.0.8.RELEASE/reference/html/spring-cloud-stream.html#_consumer_properties

Upvotes: 3

Related Questions