Reputation: 485
I'd like to use Spring Cloud Stream in this pipeline:
In other words, I'd like to route messages from Kafka to different RabbitMQ queues using parameters from incoming messages. I've tried to use RabbitMQ exchange with type headers. But I got the error below and messages had saved in all queues - the headers had been ignored
"Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'upstream.ecom' in vhost '/': received 'topic' but current is 'headers', class-id=40, method-id=10)"
This is my application.yaml
spring:
config:
activate:
on-profile: dev
kafka:
jaas:
enabled: true
properties:
security:
protocol: "SASL_PLAINTEXT"
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";
cloud:
stream:
default-binder: rabbit
function:
definition: upstreamProcessor
bindings:
upstreamProcessor-in-0:
destination: upstream.ecom
group: ms-upstream-provider
binder: kafka
upstreamProcessor-out-0:
destination: upstream.ecom
binder: rabbit
kafka:
streams:
binder.applicationId: ms-upstream-provider-dev
binder:
brokers: localhost:19093, localhost:29093, localhost:39093
bindings:
upstreamProcessor-in-0:
consumer:
configuration:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: client
password: client-secret
virtual-host: /
kafka:
type: kafka
And here my function
@Bean
public Function<String, Message<String>> upstreamProcessor() {
return value -> MessageBuilder.withPayload(value)
.setHeader("x-destination", "34-0029-1/34-0029-1_DB")
.setHeader("x-type", "upstream.ecom")
.build();
}
Regarding RabbitMQ exchange and queue binding settings this message ought to be routed to the "34-0029-1/34-0029-1_DB.ecom" queue. But it was routed to all queues :(
May be I shouldn't have used headers routing but I don't understand how to emit to the particular queue from function using dynamic routing key.
Can you give a clue what is wrong with my code or may be you have an example RabbitMQ dynamic routing?
Thanks.
Upvotes: 0
Views: 185
Reputation: 174554
The error says it all
received 'topic' but current is 'headers'
By default, SCSt creates topic exchanges, your existing exchange is a headers exchange. You need to set the exchange-type
or declare-exchange
consumer property https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties
exchangeType
The exchange type: direct, fanout, headers or topic for non-partitioned destinations and direct, headers or topic for partitioned destinations.
Default: topic.
declareExchange
Whether to declare the exchange for the destination.
Default: true.
Upvotes: 1