wst
wst

Reputation: 4338

Spring Cloud Stream with Rabbit Binder - source/sink queue names don't match

Recently, I started to play with Spring Cloud Stream and RabbitMQ binder.

If I understood everything correctly when two services want to pass message, one should configure source for sending messages and other should configure sink for receiving messages - both should use the same channel.

I have channel named testchannel. I noticed, though, that source created RabbitMQ binding:

while sink created RabbitMQ binding:

I skipped prefix, for brevity.

Now when I ran both applications. First one sends message to testchannel exchange, which then is routed to both queues (I assume routing key is testchannel). Second application consumes message from random queue, but message from default queue is never consumed.

My other problem is - 2nd app is using only sink, but it also creates binding for output channel, which is output by default, because I haven't specified anything.

I build both apps with the same Gradle script:

buildscript {
    ext {
        springBootVersion = '1.3.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
    compile(
            'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
    )
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
    }
}

First app properties:

server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Fisrt app source code:

@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}

Second app properties:

server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Second app source code:

@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}

So my questions are.

Upvotes: 4

Views: 7740

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

By default; consumers each get their own queue; it's a publish/subscribe scenario.

There is a notion of a consumer group so you can have multiple instances compete for messages from the same queue.

When binding the producer, a default queue is bound.

If you wish to subscribe to the default group; you have to set the group:

spring.cloud.stream.bindings.input.group=default

If you don't provide a group, you get an exclusive, auto-delete queue.

EDIT

Since the default queue is durable, you should also set

spring.cloud.stream.bindings.input.durableSubscription=true

to avoid a warning when the consumer binds and to make sure the queue is durable if the consumer binds first and the queue doesn't exist yet.

Upvotes: 2

Related Questions