Reputation: 1
I have a simple camel route which consumes from a Kafka topic. Does some processing and writes back to another kafka topic.
I needed to do some processing in between . I used seda in the route so that the kafka consumer doesn't get blocked on processing.
But after processing, Camel routes the message back to the source kafka endpoint and not to the destination endpoint.
from("kafka:<source endpoint details>")
.routeId("FromKafka")
.log("@@@@@@@@: ${body}")
.to("seda:myseda?waitForTaskToComplete=Never");`
from("seda:myseda")
.routeId("sedaRoute")
.process(myprocessor)
.to("kafka:<destination endpoint details>"
The output payload is once again put in the source kafka topic. If I just replace seda with direct, it just works fine.
from("kafka:<source endpoint details>")
.routeId("FromKafka")
.log("@@@@@@@@: ${body}")
.to("direct:mydirect");`
from("direct:mydirect")
.routeId("sedaRoute")
.process(myprocessor)
.to("kafka:<destination endpoint details>"
I suspected Kafka might be a request-reply exchange and the response is given back to the source endpoint. Hence tried adding "waitForTaskToComplete=Never" to seda. But no success.
Any help will be much appreciated.
Upvotes: 0
Views: 928
Reputation: 121
Incase your kafka consumer and producer topics are different. Apache camel by default keeps the consumer headers for producer as well .To avoid this, use have the update the kafka headers topic. Can we done using , bridgeEndpoint option for producer. If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. Or u can directly set headers of KafkaConstants.TOPIC for producer.
This happens only for seda routes. works perfectly fine with direct route
Upvotes: 1
Reputation: 11
I think you need to set the exchange pattern to "in only". Like this:
.to(ExchangePattern.InOnly,"seda:myseda")
Upvotes: 1