Alexey Khudyakov
Alexey Khudyakov

Reputation: 485

Spring Cloud Stream from Kafka to Rabbit

I'm trying to send messages from kafka's topic to multipe rabbitMQ queues using Kafka message key as a routing key for rabbitmq message. The pipeline looks like this: kafka's topic -> spring cloud stream application -> rabbitmq

RabbitMQ settings

Exchange type: direct. All queues are binded to the Exchange, routing keys and queues names are the same.

SCS config

  cloud:
    stream:
      default-binder: rabbit
      function:
        definition: upstreamProcessor
      bindings:
        upstreamProcessor-in-0:
          destination: here is the kafka topic
          group: ms-upstream-provider
          binder: kafka
        upstreamProcessor-out-0:
          destination: here is the rabbitMQ exchange name
          binder: rabbit
      rabbit.bindings.upstreamProcessor-out-0.producer:
        exchangeType: direct

Using upstreamProcessor function I'm reading incoming messages, create new message and send to rabbitMQ.

But I don't understand how to add routing key to the each outgoing messages? I've got a lot of different queues (> 6000) and I want to set routing keys in code not in settings In documentation and on the site I didn't find how to programmaticaly set the routing key

Thanks in advance for your suggestions.

public Function<Message<String>, Message<JsonNode>> upstreamProcessor() {
    return message -> {
        try {
            JsonNode node = mapper.readTree(message.getPayload());
            String routingKey= message.getHeaders().get(KafkaHeaders.RECEIVED_KEY).toString();
            return MessageBuilder.withPayload(node)
                    .setHeader("routingKey", routingKey)
                    .build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    };
}

Upvotes: 0

Views: 96

Answers (1)

sobychacko
sobychacko

Reputation: 5914

You can set the routing key in the code as you have done, i.e., .setHeader("routingKey," routingKey). Then you need to set the routingKeyExpression on the producer binding via configuration. spring.cloud.stream.rabbit.bindings.upstreamProcessor-out-0.producer.routing-key-expression=headers['routingKey'].

This way any outbound publishing via the upstreamProcessor-out-0 binding will consult the routingKey header as the routing key used by the AMQP outbound endpoint. Note that the header name could be anything - it doesn't have to be routingKey.

Upvotes: 1

Related Questions