Reputation: 5776
I am using Spring Cloud Stream and want to programmatically create and bind channels. My use case is that during application startup I receive the dynamic list of Kafka topics to subscribe to. How can I then create a channel for each topic?
Upvotes: 6
Views: 8504
Reputation: 1
Based on Sash's answer, I have written a demo that can dynamically register consumers.
@Autowired
void dynamicBinding(BindingService bindingService,
BindingServiceProperties bindingServiceProperties,
RabbitExtendedBindingProperties rabbitExtendedBindingProperties,
BindingTargetFactory channelFactory) {
final var CONSUMER_NAME = "onRefresh"; // SCS Consumer Name
final var RABBIT_EXCHANGE = "ouroboros.app.control"; // RabbitMQ Exchange Name
// RabbitMQ Properties
var rabbitConsumerProperties = new RabbitConsumerProperties();
var consumerProperties = new ExtendedConsumerProperties<>(rabbitConsumerProperties);
var rabbitBindingProperties = new RabbitBindingProperties();
var bindingProperties = new BindingProperties();
rabbitConsumerProperties.setExchangeType("fanout");
rabbitConsumerProperties.setAcknowledgeMode(AcknowledgeMode.MANUAL);
rabbitConsumerProperties.setRequeueRejected(false);
rabbitConsumerProperties.setRepublishToDlq(true);
rabbitConsumerProperties.setAutoBindDlq(true);
rabbitConsumerProperties.setDeclareDlx(true);
rabbitConsumerProperties.setDeadLetterExchange("ouroboros.app.control.dlx");
rabbitConsumerProperties.setDeadLetterExchangeType("topic");
consumerProperties.populateBindingName(CONSUMER_NAME);
bindingProperties.setDestination(RABBIT_EXCHANGE);
bindingProperties.setConsumer(consumerProperties);
rabbitBindingProperties.setConsumer(rabbitConsumerProperties);
bindingServiceProperties.getBindings().put(CONSUMER_NAME, bindingProperties);
rabbitExtendedBindingProperties.setBindings(Collections.singletonMap(CONSUMER_NAME, rabbitBindingProperties));
// Channel Name same as SCS Consumer name
var channel = (SubscribableChannel)channelFactory.createInput(CONSUMER_NAME);
// bind consumer
bindingService.bindConsumer(channel, CONSUMER_NAME);
// subscribe channel
channel.subscribe((message) -> {
logger.info("onRefresh: {}", message.getPayload());
throw new MessagingException("reject"); // How to gracefully reject consuming messages?
});
}
Upvotes: 0
Reputation: 1
MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));
public MessageChannel createMessageChannel(String channelName) {
return (MessageChannel) applicationContext.getBean(channelName);}
public Function<Object, Message<Object>> getMessageBuilder() {
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();}
Upvotes: 0
Reputation: 13083
I had a task where I did not know the topics in advance. I solved it by having one input channel which listens to all the topics I need.
Destination
The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma-separated String values. If not set, the channel name is used instead.
So my configuration
spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8
Then I defined one consumer which handles messages from all these topics.
@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message) {
logger.info("Received a message: \nmessage:\n{}", message.getPayload());
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
}
}
Note, in your case it may not be a solution. I needed to forward messages to webhooks, so I could have configuration mapping.
I also thought about other ideas. 1) You kafka client consumer without Spring Cloud.
2) Create a predefined number of inputs, for example 50.
input-1
intput-2
...
intput-50
And then have a configuration for some of these inputs.
Related discussions
We use Spring Cloud 2.1.1 RELEASE
Upvotes: 0
Reputation: 1274
I ran into similar scenario recently and below is my sample of creating SubscriberChannels dynamically.
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setConsumer(consumerProperties);
bindingProperties.setDestination(retryTopic);
bindingProperties.setGroup(consumerGroup);
bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
beanFactory.registerSingleton(consumerName, channel);
channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
bindingService.bindConsumer(channel, consumerName);
channel.subscribe(consumerMessageHandler);
Upvotes: 3
Reputation: 3842
I had to do something similar for the Camel Spring Cloud Stream component.
Perhaps the Consumer code to bind a destination "really just a String
indicating the channel name" would be useful to you?
In my case I only bind a single destination, however I don't imagine it being much different conceptually for multiple destinations.
Below is the gist of it:
@Override
protected void doStart() throws Exception {
SubscribableChannel bindingTarget = createInputBindingTarget();
bindingTarget.subscribe(message -> {
// have your way with the received incoming message
});
endpoint.getBindingService().bindConsumer(bindingTarget,
endpoint.getDestination());
// at this point the binding is done
}
/**
* Create a {@link SubscribableChannel} and register in the
* {@link org.springframework.context.ApplicationContext}
*/
private SubscribableChannel createInputBindingTarget() {
SubscribableChannel channel = endpoint.getBindingTargetFactory()
.createInputChannel(endpoint.getDestination());
endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
endpoint.getDestination());
return channel;
}
See here for the full source for more context.
Upvotes: 0
Reputation: 4179
For the incoming messages, you can explicitly use BinderAwareChannelResolver
to dynamically resolve the destination. You can check this example where router
sink uses binder aware channel resolver.
Upvotes: -1