Nikem
Nikem

Reputation: 5776

Spring Cloud Stream dynamic channels

I am using Spring Cloud Stream and want to programmatically create and bind channels. My use case is that during application startup I receive the dynamic list of Kafka topics to subscribe to. How can I then create a channel for each topic?

Upvotes: 6

Views: 8504

Answers (6)

BeMxself
BeMxself

Reputation: 1

Based on Sash's answer, I have written a demo that can dynamically register consumers.

  @Autowired
  void dynamicBinding(BindingService bindingService,
                      BindingServiceProperties bindingServiceProperties,
                      RabbitExtendedBindingProperties rabbitExtendedBindingProperties,
                      BindingTargetFactory channelFactory) {
    final var CONSUMER_NAME = "onRefresh";               // SCS Consumer Name
    final var RABBIT_EXCHANGE = "ouroboros.app.control"; // RabbitMQ Exchange Name

    // RabbitMQ Properties
    var rabbitConsumerProperties = new RabbitConsumerProperties();
    var consumerProperties = new ExtendedConsumerProperties<>(rabbitConsumerProperties);

    var rabbitBindingProperties = new RabbitBindingProperties();
    var bindingProperties = new BindingProperties();

    rabbitConsumerProperties.setExchangeType("fanout");
    rabbitConsumerProperties.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    rabbitConsumerProperties.setRequeueRejected(false);
    rabbitConsumerProperties.setRepublishToDlq(true);

    rabbitConsumerProperties.setAutoBindDlq(true);
    rabbitConsumerProperties.setDeclareDlx(true);
    rabbitConsumerProperties.setDeadLetterExchange("ouroboros.app.control.dlx");
    rabbitConsumerProperties.setDeadLetterExchangeType("topic");

    consumerProperties.populateBindingName(CONSUMER_NAME);

    bindingProperties.setDestination(RABBIT_EXCHANGE);
    bindingProperties.setConsumer(consumerProperties);

    rabbitBindingProperties.setConsumer(rabbitConsumerProperties);

    bindingServiceProperties.getBindings().put(CONSUMER_NAME, bindingProperties);
    rabbitExtendedBindingProperties.setBindings(Collections.singletonMap(CONSUMER_NAME, rabbitBindingProperties));

    // Channel Name same as SCS Consumer name
    var channel = (SubscribableChannel)channelFactory.createInput(CONSUMER_NAME);

    // bind consumer
    bindingService.bindConsumer(channel, CONSUMER_NAME);

    // subscribe channel
    channel.subscribe((message) -> {
      logger.info("onRefresh: {}", message.getPayload());
      throw new MessagingException("reject"); // How to gracefully reject consuming messages?
    });
  }

Upvotes: 0

Neslihan Kolukısa
Neslihan Kolukısa

Reputation: 1

MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));

public MessageChannel createMessageChannel(String channelName) {
return (MessageChannel) applicationContext.getBean(channelName);}

public Function<Object, Message<Object>> getMessageBuilder() {
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();}

Upvotes: 0

Yan Khonski
Yan Khonski

Reputation: 13083

I had a task where I did not know the topics in advance. I solved it by having one input channel which listens to all the topics I need.

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html

Destination

The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma-separated String values. If not set, the channel name is used instead.

So my configuration

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Then I defined one consumer which handles messages from all these topics.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

Note, in your case it may not be a solution. I needed to forward messages to webhooks, so I could have configuration mapping.

I also thought about other ideas. 1) You kafka client consumer without Spring Cloud.

2) Create a predefined number of inputs, for example 50.

input-1
intput-2
...
intput-50

And then have a configuration for some of these inputs.

Related discussions

We use Spring Cloud 2.1.1 RELEASE

Upvotes: 0

sash
sash

Reputation: 1274

I ran into similar scenario recently and below is my sample of creating SubscriberChannels dynamically.

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);

Upvotes: 3

Donovan Muller
Donovan Muller

Reputation: 3842

I had to do something similar for the Camel Spring Cloud Stream component. Perhaps the Consumer code to bind a destination "really just a String indicating the channel name" would be useful to you?

In my case I only bind a single destination, however I don't imagine it being much different conceptually for multiple destinations.

Below is the gist of it:

    @Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

See here for the full source for more context.

Upvotes: 0

Ilayaperumal Gopinathan
Ilayaperumal Gopinathan

Reputation: 4179

For the incoming messages, you can explicitly use BinderAwareChannelResolver to dynamically resolve the destination. You can check this example where router sink uses binder aware channel resolver.

Upvotes: -1

Related Questions