Reputation: 21
I would like to have a binding that connects to a specific queue/topic and routes to the right function based on a specific header entry.
I could not find any example for this case. I have tried several approaches, but with none of them I had success.
This for example didn't work:
spring:
cloud:
function:
routing:
enabled: true
stream:
function:
routing:
enabled: true
definition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: myBroker
routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
myOtherConsumer-in-0: #without specific routing
Every concrete example is appreciated
Upvotes: 0
Views: 2170
Reputation: 1
You actually do not need to provide the spring.cloud.stream.function.routing.enabled=true
parameter in application.properties
file in order to make routing work, because it automatically works as soon as you provide the routing-expression
parameter - see: spring cloud stream documentation
Upvotes: 0
Reputation: 1893
To enable routing, by default the binding with name functionRouter would be created.
As per docs:
The RoutingFunction is registered in FunctionCatalog under the name functionRouter. For simplicity and consistency you can also refer to RoutingFunction.FUNCTION_NAME constant.
Below config should work fine:
spring:
cloud:
stream:
function:
definition: functionRouter;
routing:
enabled: true
kafka:
binder:
brokers:
- localhost:9092
bindings:
functionRouter-in-0:
destination: my.topic
group: my.topic.group
function:
routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"
You don't need to create even and odd consumer function definition.
Upvotes: 0
Reputation: 21
I finally found a way to achieve my goal. But I'm not sure wheter this is THE way to do it:
spring:
cloud:
function:
routing:
enabled: true
routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
stream:
function:
definition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: myBroker
myOtherConsumer-in-0: #without specific routing
with the following beans:
@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
return message -> {
LOG.info("Sending to routingFunction");
routingFunction.apply(message);
};
}
@Bean
public Consumer<byte[]> evenConsumer() {
return (payload) -> LOG.info("even got: {}", new String(payload));
}
@Bean
public Consumer<byte[]> oddConsumer() {
return (payload) -> LOG.info("odd got: {}", new String(payload));
}
Upvotes: 1
Reputation: 174554
Consumers don't "route" messages, they consume from queues. Producers route messages using s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression
.
Upvotes: 0