Reputation: 2761
There was a change request on my system, which currently listens to multiple channels and send messages to multiple channels as well, but now the destination names will be in the database and change any time. I'm having trouble believing I'm the first one to come across this, but I see limited information out there.
All I found is these 2...
Dynamic sink destination:
https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router, but how would that work to active listening to those channels the way it's done by @StreamListener?
Dynamic source destinations: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/, which does this
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
But what's that "payload.id"? And where are the destinations specified there??
Upvotes: 3
Views: 8034
Reputation: 13083
Feel free to improve my answer, I hope it will help others.
Now the code (It worked in my debugger). This is an example, not production ready!
import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
@Service
@EnableBinding
public class MessageSenderService {
@Autowired
private BinderAwareChannelResolver resolver;
@Transactional
public void sendMessage(final String topicName, final String payload) {
final MessageChannel messageChannel = resolver.resolveDestination(topicName);
messageChannel.send(new GenericMessage<String>(payload));
}
}
And configuration for Spring Cloud Stream.
spring:
cloud:
stream:
dynamicDestinations: output.topic.1,output.topic2,output.topic.3
I found here https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination It will work in spring Cloud Stream version 2+. I use 2.1.2
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
https://stackoverflow.com/a/56148190/4587961
Configuration
spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8
Java consumer.
@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message) {
logger.info("Received a message: \nmessage:\n{}", message.getPayload());
final String topic = message.getHeaders().get("kafka_receivedTopic");
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
}
}
Upvotes: 2