Reputation: 4338
Recently, I started to play with Spring Cloud Stream and RabbitMQ binder.
If I understood everything correctly when two services want to pass message, one should configure source for sending messages and other should configure sink for receiving messages - both should use the same channel.
I have channel named testchannel
. I noticed, though, that source created RabbitMQ binding:
testchannel
,testchannel
,testchannel.default
(durable),while sink created RabbitMQ binding:
testchannel
,#
,testchannel.anonymous.RANDOM_ID
(excusive).I skipped prefix, for brevity.
Now when I ran both applications. First one sends message to testchannel
exchange, which then is routed to both queues (I assume routing key is testchannel
). Second application consumes message from random queue, but message from default queue is never consumed.
My other problem is - 2nd app is using only sink, but it also creates binding for output channel, which is output
by default, because I haven't specified anything.
I build both apps with the same Gradle script:
buildscript {
ext {
springBootVersion = '1.3.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
compile(
'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
)
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
}
}
First app properties:
server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
Fisrt app source code:
@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}
Second app properties:
server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
Second app source code:
@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
So my questions are.
Upvotes: 4
Views: 7740
Reputation: 174554
By default; consumers each get their own queue; it's a publish/subscribe scenario.
There is a notion of a consumer group
so you can have multiple instances compete for messages from the same queue.
When binding the producer, a default queue is bound.
If you wish to subscribe to the default
group; you have to set the group:
spring.cloud.stream.bindings.input.group=default
If you don't provide a group, you get an exclusive, auto-delete queue.
EDIT
Since the default queue is durable, you should also set
spring.cloud.stream.bindings.input.durableSubscription=true
to avoid a warning when the consumer binds and to make sure the queue is durable if the consumer binds first and the queue doesn't exist yet.
Upvotes: 2