Vivek Misra
Vivek Misra

Reputation: 175

Spring Cloud Stream topic per message for different consumers (in one Comsumer app)

This question is similar to Spring Cloud Stream topic per message for different consumers but the difference is that I want multiple Sinks in one consumer springboot application and I want to do this by rabbitmq topic(which is by default in spring cloud stream). I am not able to figure out correct configuration or somethign wrong in code. I have 3 sinks/cosumers. consumer1 is default and every message goes there.

**Updated as suggested by Garry **

Comment: my Producer App has routing key='*.events' application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-exchange
      rabbit:
        bindings:
          output:
            producer:
              routing-key-expression: headers['*.events']
  application:
    name: publisher-service
server:
  port: 15010

Producer code snippet Comment:message is sent with routing key ="test.events" . I sm not sure of 2nd argument but i am assuming it is bindingrouting-key =test1.events.billing which means I want it to be delivered to billing consumer besides default consumer.

 source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
                    .setHeader("*.events", "test1.events.billing")
                    .build());

Consumer configuration Comment: I want 3 queues assigned to exchange ="myexchange" . I am not sure if config is right. application.yml

spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages

  application:
    name: subscriber-service
server:
  port: 15020

Consumer code: IEventConsumer.java Comment: I am not sure the code below is right

public interface IEventConsumer {
     String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();
}

EventConsumer.java Comment: All Iwant from below is the message should not be received my messsageConsumer! But in reality it goes thru all these methods.



    @StreamListener("defaultconsumer")
    public void subscribe1(EventMessage eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }


   @StreamListener("billingconsumer")
    public void subscribe2(EventMessage eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(EventMessage eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

Apparently something is wrong above and I dont see this working .Any ideas?

Upvotes: 0

Views: 996

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

    @Input(INPUT)
    SubscribableChannel defaultconsumer();

    @Input(INPUT)
    SubscribableChannel billingconsumer();

    @Input(INPUT)
    SubscribableChannel messageconsumer();

You are giving all three bindings the same name; just use @INPUT and the method name will be used as the binding name.

And

@StreamListener("defaultconsumer")

etc.

EDIT

I just copied your code and it worked fine...

@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {

    private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So60879187Application.class, args);
    }

    @StreamListener("defaultconsumer")
    public void subscribe1(String eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("billingconsumer")
    public void subscribe2(String eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(String eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> output.send(MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build());
    }

}

interface IEventConsumer {
    String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();

}
spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3
          output:
            destination: my-exchange

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages
            output:
              producer:
                routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

and

2020-03-27 09:45:33.607  INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application   
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 09:45:33.607  INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application   
  :  billingEventConsumer received new event [foo] 

enter image description here

EDIT2

Newer functional programming model equivalent...

@SpringBootApplication
public class So608791871Application {

    private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So608791871Application.class, args);
    }

    @Bean
    public Consumer<String> defaultconsumer() {
        return eventMessage ->
                logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> billingconsumer() {
        return eventMessage ->
                logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> messageconsumer() {
        return eventMessage ->
                logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    private final DirectProcessor<Message<?>> output = DirectProcessor.create();

    @Bean
    public Supplier<Flux<Message<?>>> output() {
        return () -> this.output;
    }

    @Bean
    public ApplicationRunner runner() {
        Message<String> msg1 = MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build();
        Message<String> msg2 = MessageBuilder.withPayload("bar")
                .setHeader("*.events", "test2.events.messages")
                .build();
        return args -> {
            this.output.onNext(msg1);
            this.output.onNext(msg2);
        };
    }

}
spring:
  cloud:
    function:
      definition: defaultconsumer;billingconsumer;messageconsumer;output
    stream:
      bindings:
        defaultconsumer-in-0:
          destination: my-exchange
          group: queue1
        billingconsumer-in-0:
          destination: my-exchange
          group: queue2
        messageconsumer-in-0:
          destination: my-exchange
          group: queue3
        output-out-0:
          destination: my-exchange

      rabbit:
        bindings:
          defaultconsumer-in-0:
            consumer:
              bindingRoutingKey: '*.events.#'
          billingconsumer-in-0:
            consumer:
              bindingRoutingKey: test1.events.billing
          messageconsumer-in-0:
            consumer:
              bindingRoutingKey: test2.events.messages
          output-out-0:
            producer:
              routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

and

2020-03-27 14:28:37.426  INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
  :  messageEventConsumer received new event [bar] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
  :  billingEventConsumer received new event [foo] 
2020-03-27 14:28:37.429  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [bar] 

Upvotes: 2

Related Questions