Reputation: 175
This question is similar to Spring Cloud Stream topic per message for different consumers but the difference is that I want multiple Sinks in one consumer springboot application and I want to do this by rabbitmq topic(which is by default in spring cloud stream). I am not able to figure out correct configuration or somethign wrong in code. I have 3 sinks/cosumers. consumer1 is default and every message goes there.
**Updated as suggested by Garry **
Comment: my Producer App has routing key='*.events' application.yml
spring:
cloud:
stream:
bindings:
output:
destination: my-exchange
rabbit:
bindings:
output:
producer:
routing-key-expression: headers['*.events']
application:
name: publisher-service
server:
port: 15010
Producer code snippet Comment:message is sent with routing key ="test.events" . I sm not sure of 2nd argument but i am assuming it is bindingrouting-key =test1.events.billing which means I want it to be delivered to billing consumer besides default consumer.
source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
.setHeader("*.events", "test1.events.billing")
.build());
Consumer configuration Comment: I want 3 queues assigned to exchange ="myexchange" . I am not sure if config is right. application.yml
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
application:
name: subscriber-service
server:
port: 15020
Consumer code: IEventConsumer.java Comment: I am not sure the code below is right
public interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
EventConsumer.java Comment: All Iwant from below is the message should not be received my messsageConsumer! But in reality it goes thru all these methods.
@StreamListener("defaultconsumer")
public void subscribe1(EventMessage eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(EventMessage eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(EventMessage eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
Apparently something is wrong above and I dont see this working .Any ideas?
Upvotes: 0
Views: 996
Reputation: 174554
@Input(INPUT)
SubscribableChannel defaultconsumer();
@Input(INPUT)
SubscribableChannel billingconsumer();
@Input(INPUT)
SubscribableChannel messageconsumer();
You are giving all three bindings the same name; just use @INPUT
and the method name will be used as the binding name.
And
@StreamListener("defaultconsumer")
etc.
EDIT
I just copied your code and it worked fine...
@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {
private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);
public static void main(String[] args) {
SpringApplication.run(So60879187Application.class, args);
}
@StreamListener("defaultconsumer")
public void subscribe1(String eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(String eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(String eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> output.send(MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build());
}
}
interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
output:
destination: my-exchange
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
output:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
and
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application
: DefaultEventConsumer received new event [foo]
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application
: billingEventConsumer received new event [foo]
EDIT2
Newer functional programming model equivalent...
@SpringBootApplication
public class So608791871Application {
private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);
public static void main(String[] args) {
SpringApplication.run(So608791871Application.class, args);
}
@Bean
public Consumer<String> defaultconsumer() {
return eventMessage ->
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> billingconsumer() {
return eventMessage ->
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> messageconsumer() {
return eventMessage ->
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
private final DirectProcessor<Message<?>> output = DirectProcessor.create();
@Bean
public Supplier<Flux<Message<?>>> output() {
return () -> this.output;
}
@Bean
public ApplicationRunner runner() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build();
Message<String> msg2 = MessageBuilder.withPayload("bar")
.setHeader("*.events", "test2.events.messages")
.build();
return args -> {
this.output.onNext(msg1);
this.output.onNext(msg2);
};
}
}
spring:
cloud:
function:
definition: defaultconsumer;billingconsumer;messageconsumer;output
stream:
bindings:
defaultconsumer-in-0:
destination: my-exchange
group: queue1
billingconsumer-in-0:
destination: my-exchange
group: queue2
messageconsumer-in-0:
destination: my-exchange
group: queue3
output-out-0:
destination: my-exchange
rabbit:
bindings:
defaultconsumer-in-0:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer-in-0:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer-in-0:
consumer:
bindingRoutingKey: test2.events.messages
output-out-0:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
and
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
: messageEventConsumer received new event [bar]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [foo]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
: billingEventConsumer received new event [foo]
2020-03-27 14:28:37.429 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [bar]
Upvotes: 2