Reputation: 485
I'm trying to send messages from kafka's topic to multipe rabbitMQ queues using Kafka message key as a routing key for rabbitmq message. The pipeline looks like this: kafka's topic -> spring cloud stream application -> rabbitmq
RabbitMQ settings
Exchange type: direct. All queues are binded to the Exchange, routing keys and queues names are the same.
SCS config
cloud:
stream:
default-binder: rabbit
function:
definition: upstreamProcessor
bindings:
upstreamProcessor-in-0:
destination: here is the kafka topic
group: ms-upstream-provider
binder: kafka
upstreamProcessor-out-0:
destination: here is the rabbitMQ exchange name
binder: rabbit
rabbit.bindings.upstreamProcessor-out-0.producer:
exchangeType: direct
Using upstreamProcessor function I'm reading incoming messages, create new message and send to rabbitMQ.
But I don't understand how to add routing key to the each outgoing messages? I've got a lot of different queues (> 6000) and I want to set routing keys in code not in settings In documentation and on the site I didn't find how to programmaticaly set the routing key
Thanks in advance for your suggestions.
public Function<Message<String>, Message<JsonNode>> upstreamProcessor() {
return message -> {
try {
JsonNode node = mapper.readTree(message.getPayload());
String routingKey= message.getHeaders().get(KafkaHeaders.RECEIVED_KEY).toString();
return MessageBuilder.withPayload(node)
.setHeader("routingKey", routingKey)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
}
Upvotes: 0
Views: 96
Reputation: 5914
You can set the routing key in the code as you have done, i.e., .setHeader("routingKey," routingKey).
Then you need to set the routingKeyExpression
on the producer binding via configuration.
spring.cloud.stream.rabbit.bindings.upstreamProcessor-out-0.producer.routing-key-expression=headers['routingKey']
.
This way any outbound publishing via the upstreamProcessor-out-0
binding will consult the routingKey
header as the routing key used by the AMQP outbound endpoint. Note that the header name could be anything - it doesn't have to be routingKey
.
Upvotes: 1