CCC
CCC

Reputation: 2761

Spring Cloud Streams - Multiple dynamic destinations for sources and sinks

There was a change request on my system, which currently listens to multiple channels and send messages to multiple channels as well, but now the destination names will be in the database and change any time. I'm having trouble believing I'm the first one to come across this, but I see limited information out there.

All I found is these 2...
Dynamic sink destination: https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router, but how would that work to active listening to those channels the way it's done by @StreamListener?

Dynamic source destinations: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/, which does this

@Bean
    @ServiceActivator(inputChannel = "sourceChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }

But what's that "payload.id"? And where are the destinations specified there??

Upvotes: 3

Views: 8034

Answers (1)

Yan Khonski
Yan Khonski

Reputation: 13083

Feel free to improve my answer, I hope it will help others.

Now the code (It worked in my debugger). This is an example, not production ready!

This is how to send a message to dynamic destination

import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;


@Service
@EnableBinding
public class MessageSenderService {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Transactional
    public void sendMessage(final String topicName, final String payload) {
        final MessageChannel messageChannel = resolver.resolveDestination(topicName);
        messageChannel.send(new GenericMessage<String>(payload));
    }
}

And configuration for Spring Cloud Stream.

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

I found here https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination It will work in spring Cloud Stream version 2+. I use 2.1.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

This is how to consume a message from dynamic destination

https://stackoverflow.com/a/56148190/4587961

Configuration

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Java consumer.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());

        final String topic = message.getHeaders().get("kafka_receivedTopic");

        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

Upvotes: 2

Related Questions