Id View
Id View

Reputation: 35

Named Destination for Spring Cloud Stream Source Extension

We are in process of extending the rabbit mq source connector in SCDF running on a kafka binder and add additional functionalities that coule be extended using functions to add additional processing before message is delivered to final destination.

The question I have is, currently the output gets delivered by default to topic output when running as spring boot. Is there a way to give a named queue instead of the default output topic?

is this a known limitation as the rabbit source is based on boot 2.1.x? If so, are there alternatives to achieve a similar functionality using the latest Supplier function for rabbit mq listener.

Simple App

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration.class)
public class RabbitSourceApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitSourceApp.class, args);
    }

    @Bean
    public Function<String, String> upper() {
        return value -> value.toUpperCase();
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.source</groupId>
    <artifactId>source.sample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath></relativePath>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
        <spring-cloud.schema.version>2.2.1.RELEASE</spring-cloud.schema.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>spring-cloud-starter-stream-source-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
            <exclusions>
                <exclusion>  <!-- declare the exclusion here -->
                    <groupId>io.pivotal.spring.cloud</groupId>
                    <artifactId>spring-cloud-services-starter-config-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>${spring-cloud.schema.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

application yaml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:29092
      bindings:
        upper-out-0:
          destination: upperQ
      function:
        definition: upper
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
rabbit.queues: TestQ

Upvotes: 1

Views: 238

Answers (1)

Id View
Id View

Reputation: 35

This is what worked for me, but not sure if this is the correct solution.

      bindings:
        output:
          destination: upperQ

Upvotes: 1

Related Questions