Dharita Chokshi
Dharita Chokshi

Reputation: 1263

How to send messages to one of the multiple topics based on condition in Spring Cloud Stream Kafka application

Currently i have a spring clound funtion which consumes a topic and publish in to another topic. Now I have multiple topics and need to publish message to one of the multiple topic based on certain checks from spring cloud function. How can I achieve this? Here is current implementation.

@Bean("producerBean")
    public Function<Message<SourceMessage>, Message<SinkMessage>> producerBean(SinkService<SourceMessage> sinkService) {
        return sinkService::processMessage;
    }

@Service("SinkService")
public class SinkService<T> {

    public Message<SinkMessage> processMessage(Message<SourceMessage> message) {
        log.info("Message consumed at {} \n{}", message.getHeaders().getTimestamp(), message.getPayload());
        try {
            if (message.getPayload().isManaged()) {
                /*
                Need to add one more check here.
                if (type==2)
                    send to topic1
                else if(type==4)
                    send to topic2
                else
                    Just log the type, do not send to any topic.
                 */
                Message<SinkMessage> output = new GenericMessage<>(new SinkMessage());
                output.getPayload().setPayload(message.getPayload());
                return output;
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return null;
    }
}

application.properties

spring.cloud.stream.kafka.binder.brokers=${bootstrap.servers}
spring.cloud.stream.kafka.binder.configuration.enable.idempotence=false
spring.cloud.stream.binders.test_binder.type=kafka

spring.cloud.stream.bindings.producerBean.binder=test_binder
spring.cloud.stream.bindings.producerBean-in-0.destination=${input-destination}
spring.cloud.stream.bindings.producerBean-in-0.group=${input-group}
spring.cloud.stream.bindings.producerBean-out-0.destination=topic1
spring.cloud.stream.bindings.producerBean-out-1.destination=topic2

pom.xml

<dependency>
   <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>3.2.5</version>
</dependency>

Upvotes: 1

Views: 1263

Answers (1)

Axiomatic-1
Axiomatic-1

Reputation: 61

You can use StreamBridge with kafka-topicname and spring-cloud will bind it automatically in runtime. That approach also auto creates topic if that not exist, you can turn it off.

@Autowired private final StreamBridge streamBridge;

public void sendDynamically(Message message, String topicName) {
    streamBridge.send(route, topicName);
}

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_streambridge_and_dynamic_destinations

Upvotes: 1

Related Questions