Reputation: 781
I'd like to know what is the best way to handle messages with unknown/invalid routing key values inside an exchange? In my case, I'm sending all my messages inside the same exchange and based on a routing key, messages are routed to the corresponding queue. Here's my configuration (I'm using Spring Cloud Stream) :
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
Now what I'm asking is what happens if I send a message with payload.type='ANY'
? Obviously, this message won't be retrieved by any consumer and will remain inside the exchange, but what is the best way to keep track of these "unknown" messages? Can I use a DLQ for this?
Thanks!
Upvotes: 0
Views: 1638
Reputation: 174574
will remain inside the exchange,
No; exchanges don't "hold" messages, they are simply routers.
Unroutable messages are discarded by default.
You can configure the binding to return unroutable messages.
See Error Channels.
Returns are asynchronous.
In the upcoming 3.1 release, you can wait on a future to determine whether the message was sent successfully or not. See Publisher Confirms.
If the message is unroutable, the correlation data's returnedMessage
property is set.
The framework uses the mandatory
feature mentioned in another answer.
EDIT
Here is an example:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}
Upvotes: 1
Reputation: 2440
You could use mandatory
property for your publishers.
When a published message cannot be routed to any queue,
and the publisher set the mandatory message property to true, the
message will be returned to it.
The publisher must have a returned message handler set up in order to
handle the return (e.g. by logging an error or retrying with a different exchange)
Upvotes: 2