Reputation: 35
We are in process of extending the rabbit mq source connector in SCDF running on a kafka binder and add additional functionalities that coule be extended using functions to add additional processing before message is delivered to final destination.
The question I have is, currently the output gets delivered by default to topic output when running as spring boot. Is there a way to give a named queue instead of the default output topic?
is this a known limitation as the rabbit source is based on boot 2.1.x? If so, are there alternatives to achieve a similar functionality using the latest Supplier function for rabbit mq listener.
Simple App
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration.class)
public class RabbitSourceApp {
public static void main(String[] args) {
SpringApplication.run(RabbitSourceApp.class, args);
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.source</groupId>
<artifactId>source.sample</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath></relativePath>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud.version>Hoxton.SR1</spring-cloud.version>
<spring-cloud.schema.version>2.2.1.RELEASE</spring-cloud.schema.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
<exclusions>
<exclusion> <!-- declare the exclusion here -->
<groupId>io.pivotal.spring.cloud</groupId>
<artifactId>spring-cloud-services-starter-config-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud.schema.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
application yaml
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:29092
bindings:
upper-out-0:
destination: upperQ
function:
definition: upper
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit.queues: TestQ
Upvotes: 1
Views: 238
Reputation: 35
This is what worked for me, but not sure if this is the correct solution.
bindings:
output:
destination: upperQ
Upvotes: 1