Alexey Khudyakov
Alexey Khudyakov

Reputation: 485

Spring Cloud Stream From Kafka to RabbitMQ dynamic queues

I'd like to use Spring Cloud Stream in this pipeline:

In other words, I'd like to route messages from Kafka to different RabbitMQ queues using parameters from incoming messages. I've tried to use RabbitMQ exchange with type headers. But I got the error below and messages had saved in all queues - the headers had been ignored

"Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'upstream.ecom' in vhost '/': received 'topic' but current is 'headers', class-id=40, method-id=10)"

This is my application.yaml

spring:
  config:
    activate:
      on-profile: dev
  kafka:
    jaas:
      enabled: true
    properties:
      security:
        protocol: "SASL_PLAINTEXT"
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";
  cloud:
    stream:
      default-binder: rabbit
      function:
        definition: upstreamProcessor
      bindings:
        upstreamProcessor-in-0:
          destination: upstream.ecom
          group: ms-upstream-provider
          binder: kafka
        upstreamProcessor-out-0:
          destination: upstream.ecom
          binder: rabbit
      kafka:
        streams:
          binder.applicationId: ms-upstream-provider-dev
        binder:
          brokers: localhost:19093, localhost:29093, localhost:39093
        bindings:
          upstreamProcessor-in-0:
            consumer:
              configuration:
                key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: client
                password: client-secret
                virtual-host: /
        kafka:
          type: kafka

And here my function

@Bean
public Function<String, Message<String>> upstreamProcessor() {

    return value -> MessageBuilder.withPayload(value)
            .setHeader("x-destination", "34-0029-1/34-0029-1_DB")
            .setHeader("x-type", "upstream.ecom")
            .build();
}

Regarding RabbitMQ exchange and queue binding settings this message ought to be routed to the "34-0029-1/34-0029-1_DB.ecom" queue. But it was routed to all queues :(

May be I shouldn't have used headers routing but I don't understand how to emit to the particular queue from function using dynamic routing key.

Can you give a clue what is wrong with my code or may be you have an example RabbitMQ dynamic routing?

Thanks.

Upvotes: 0

Views: 185

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

The error says it all

received 'topic' but current is 'headers'

By default, SCSt creates topic exchanges, your existing exchange is a headers exchange. You need to set the exchange-type or declare-exchange consumer property https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties

exchangeType

The exchange type: direct, fanout, headers or topic for non-partitioned destinations and direct, headers or topic for partitioned destinations.

Default: topic.

declareExchange

Whether to declare the exchange for the destination.

Default: true.

Upvotes: 1

Related Questions