Bram
Bram

Reputation: 1196

How to listen to two RabbitMQ queues with spring-cloud-stream

I've got a working application that listens to a single RabbitMQ queue.

However, when I add another bean that consumes messages and try to bind that to another queue, neither of the queues are created in RabbitMQ and when creating them manually no messages are consumed from these queues.

Small kotlin project I created to demonstrate the issue:

@SpringBootApplication
class SpringCloudStreamTwoRabbitConsumersApplication

fun main(args: Array<String>) {
    runApplication<SpringCloudStreamTwoRabbitConsumersApplication>(*args)
}
package com.example.springcloudstreamtworabbitconsumers

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class Listener1Config {

    @Bean
    fun listener1(): Consumer<Message<String>> {
        return Consumer { input -> println(input) }
    }

}
package com.example.springcloudstreamtworabbitconsumers

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class Listener2Config {

    @Bean
    fun listener2(): Consumer<Message<String>> {
        return Consumer { input -> println(input) }
    }

}

application.properties:

# Rabbit properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Listener 1
spring.cloud.stream.bindings.listener1-in-0.destination=exchange1
spring.cloud.stream.bindings.listener1-in-0.group=exchange1-queue
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.binding-routing-key-delimiter=,
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.bindingRoutingKey=binding.key.1,binding.key.1.1
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.exchangeType=topic
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.autoBindDlq=true

# Listener 2
spring.cloud.stream.bindings.listener2-in-0.destination=exchange2
spring.cloud.stream.bindings.listener2-in-0.group=exchange2-queue
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.binding-routing-key-delimiter=,
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.bindingRoutingKey=binding.key.2,binding.key.2.1
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.exchangeType=topic
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.autoBindDlq=true

build.gradle.kts:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.4.3"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.4.30"
    kotlin("plugin.spring") version "1.4.30"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_1_8

repositories {
    mavenCentral()
}

extra["springCloudVersion"] = "2020.0.1"

dependencies {
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-rabbit")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "1.8"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

When I comment out one of the listener beans the other one works as expected. However with both beans active, no queue is created in RabbitMQ, nor are messages read from the queues if I create them manually and send messages to the exchange.

What am I doing wrong here?

Upvotes: 0

Views: 1846

Answers (1)

Gary Russell
Gary Russell

Reputation: 174564

The framework can only detect a single function. When you have multiple, you need to specify:

spring.cloud.function.definition=listener1;listener2

https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#spring_cloud_function

In the event you only have single bean of type java.util.function.[Supplier/Function/Consumer], you can skip the spring.cloud.function.definition property, since such functional bean will be auto-discovered. However, it is considered best practice to use such property to avoid any confusion.

Upvotes: 2

Related Questions