Reputation: 497
we are using Spring Cloud Stream to listen to rabbitMQ multiple queues, especially the SCF model
by the time there was a single node/host it was working good (application.yml snippet shared below),
however the moment we try to connect multiple nodes it is failing, Can someone guide how to connect the same or have some sample related to Spring Cloud Documentation
Following Code is working as expected
spring:
cloud:
stream:
function:
definition: function1;function2;function3
bindings:
function1-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
function2-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
function3-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
rabbit:
bindings:
function1-in-0:
consumer:
bindingRoutingKey: routing.key.1
function2-in-0:
consumer:
bindingRoutingKey: routing.key.2
function3-in-0:
consumer:
bindingRoutingKey: routing.key.3
binder:
nodes: address1
Basically it need to be something like following
spring:
cloud:
stream:
function:
definition: function1;function2;function3
bindings:
function1-in-0:
group: allocation
destination: destinationExchange
binder: rabbit1
function2-in-0:
group: allocation
destination: destinationExchange
binder: rabbit2
function3-in-0:
group: allocation
destination: destinationExchange
binder: rabbit3
binder:
rabbit1:
function1-in-0:
consumer:
bindingRoutingKey: routing.key.1
binder:
nodes: address1
rabbit2:
function2-in-0:
consumer:
bindingRoutingKey: routing.key.2
binder:
nodes: address2
rabbit3:
function3-in-0:
consumer:
bindingRoutingKey: routing.key.3
binder:
nodes: address3
with following addition itself
binders:
rabbit1:
type: rabbit
environment:
spring.spring.cloud.stream.kafka:
binder:
nodes: localhost
i am getting this error
o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: rabbit
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.8.jar:5.3.8]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332) ~[spring-boot-2.5.2.jar:2.5.2]
at com.gap.pem.Application.main(Application.java:14) ~[main/:na]
Caused by: java.lang.IllegalStateException: Unknown binder configuration: rabbit
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.8.jar:5.3.8]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:255) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:224) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:152) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:386) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:103) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.8.jar:5.3.8]
... 14 common frames omitted
Process finished with exit code 1
we have following dependencies available
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
Upvotes: 1
Views: 2051
Reputation: 1763
I struggled a lot getting this to run. Especially with routing keys. In the end my solution was a bitt different than shown here. In hopes that it will help someone in the future :
spring:
cloud:
stream:
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: some.server.com:5672,some.other.server.com:5672
username: someUserName
password: someUserPassword
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
addresses: some.rabbit2.server.us:5672,someother.rabbit2.server.de:5672
username: secondRabbitUserName
password: secondRabbitPassword
function:
definition: someReceiver;anotherRec
rabbit:
bindings:
someReceiver-in-0:
consumer:
auto-bind-dlq: true
republishToDlq: true
bindingRoutingKey: some.routing.key.#
anotherRec-in-0:
consumer:
bindingRoutingKey: finished.#
firstAction-out-0:
producer:
routingKeyExpression: "'switch.'+headers.someValue+'.'+headers.someOtherValue"
bindingRoutingKey: switch.#
userNotification-out-0:
producer:
routingKeyExpression: "'switch.someKeyExpression'"
bindingRoutingKey: switch.#
bindings:
anotherRec-in-0:
destination: reciving.exchange.name
group: some-queue-name-v7
binder: rabbit1
firstAction-out-0:
destination: some.exhange.name.1
binder: rabbit1
someReceiver-in-0:
destination: another.exchange.name.1
group: queueName
binder: rabbit2
userNotification-out-0:
destination: the.exchange.name
binder: rabbit2
Upvotes: 1
Reputation: 131
Upon adding the binders config for both rabbit1 and rabbit2 it resolved the issue:
Below is the sample config which I tried and was able to consume messages successfully
spring:
cloud:
stream:
function:
definition: processFirstConsumer;processSecondConsumer
bindings:
processFirstConsumer-in-0:
group: allocation
destination: userMessage1
binder: rabbit1
processSecondConsumer-in-0:
group: allocation
destination: userMessage2
binder: rabbit2
binder:
rabbit1:
processFirstConsumer-in-0:
consumer:
bindingRoutingKey: routing.key.1
binder:
nodes: address1
rabbit2:
processSecondConsumer-in-0:
consumer:
bindingRoutingKey: routing.key.2
binder:
nodes: address2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
Upvotes: 0