ShankPossible
ShankPossible

Reputation: 497

Spring Cloud Stream connect to multiple hosts for single binder (RabbitMQ)

we are using Spring Cloud Stream to listen to rabbitMQ multiple queues, especially the SCF model

by the time there was a single node/host it was working good (application.yml snippet shared below),

however the moment we try to connect multiple nodes it is failing, Can someone guide how to connect the same or have some sample related to Spring Cloud Documentation

Following Code is working as expected

spring:
  cloud:
    stream:
      function:
        definition: function1;function2;function3
      bindings:
        function1-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
        function2-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
        function3-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
      rabbit:
        bindings:
          function1-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          function2-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          function3-in-0:
            consumer:
              bindingRoutingKey: routing.key.3
        binder:
          nodes: address1

Basically it need to be something like following

spring:
  cloud:
    stream:
      function:
        definition: function1;function2;function3
      bindings:
        function1-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit1
        function2-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit2
        function3-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit3
      binder:
        rabbit1:
          function1-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          binder:
            nodes: address1
        rabbit2:
          function2-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          binder:
            nodes: address2
        rabbit3:
          function3-in-0:
            consumer:
              bindingRoutingKey: routing.key.3
          binder:
            nodes: address3

with following addition itself

binders:
    rabbit1:
      type: rabbit
      environment:
        spring.spring.cloud.stream.kafka:
          binder:
            nodes: localhost

i am getting this error

o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: rabbit
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.8.jar:5.3.8]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332) ~[spring-boot-2.5.2.jar:2.5.2]
    at com.gap.pem.Application.main(Application.java:14) ~[main/:na]
Caused by: java.lang.IllegalStateException: Unknown binder configuration: rabbit
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.8.jar:5.3.8]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:255) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:224) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:152) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:386) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:103) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.8.jar:5.3.8]
    ... 14 common frames omitted


Process finished with exit code 1

we have following dependencies available

implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'

Upvotes: 1

Views: 2051

Answers (2)

GJohannes
GJohannes

Reputation: 1763

I struggled a lot getting this to run. Especially with routing keys. In the end my solution was a bitt different than shown here. In hopes that it will help someone in the future :

spring:
  cloud:
    stream:
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: some.server.com:5672,some.other.server.com:5672
                username: someUserName
                password: someUserPassword
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: some.rabbit2.server.us:5672,someother.rabbit2.server.de:5672
                username: secondRabbitUserName
                password: secondRabbitPassword
      function:
        definition: someReceiver;anotherRec
      rabbit:
        bindings:
          someReceiver-in-0:
            consumer:
              auto-bind-dlq: true
              republishToDlq: true
              bindingRoutingKey: some.routing.key.#
          anotherRec-in-0:
            consumer:
              bindingRoutingKey: finished.#
          firstAction-out-0:
            producer:
              routingKeyExpression: "'switch.'+headers.someValue+'.'+headers.someOtherValue"
              bindingRoutingKey: switch.#
          userNotification-out-0:
            producer:
              routingKeyExpression: "'switch.someKeyExpression'"
              bindingRoutingKey: switch.#
      bindings:
        anotherRec-in-0:
          destination: reciving.exchange.name
          group: some-queue-name-v7
          binder: rabbit1
        firstAction-out-0:
          destination: some.exhange.name.1
          binder: rabbit1
        someReceiver-in-0:
          destination: another.exchange.name.1
          group: queueName
          binder: rabbit2
        userNotification-out-0:
          destination: the.exchange.name
          binder: rabbit2

Upvotes: 1

Sujit kumar
Sujit kumar

Reputation: 131

Upon adding the binders config for both rabbit1 and rabbit2 it resolved the issue:

Below is the sample config which I tried and was able to consume messages successfully

 spring:
  cloud:
    stream:
      function:
        definition: processFirstConsumer;processSecondConsumer
      bindings:
        processFirstConsumer-in-0:
          group: allocation
          destination: userMessage1
          binder: rabbit1
        processSecondConsumer-in-0:
          group: allocation
          destination: userMessage2
          binder: rabbit2
      binder:
        rabbit1:
          processFirstConsumer-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          binder:
            nodes: address1
        rabbit2:
          processSecondConsumer-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          binder:
            nodes: address2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /

Upvotes: 0

Related Questions