Ponmani
Ponmani

Reputation: 1

Camel route sourced from seda is not routing to the Kafka destination endpoint

I have a simple camel route which consumes from a Kafka topic. Does some processing and writes back to another kafka topic.

I needed to do some processing in between . I used seda in the route so that the kafka consumer doesn't get blocked on processing.

But after processing, Camel routes the message back to the source kafka endpoint and not to the destination endpoint.

from("kafka:<source endpoint details>")
            .routeId("FromKafka")
            .log("@@@@@@@@:  ${body}")
            .to("seda:myseda?waitForTaskToComplete=Never");`

from("seda:myseda")
            .routeId("sedaRoute")
            .process(myprocessor)
            .to("kafka:<destination endpoint details>"

The output payload is once again put in the source kafka topic. If I just replace seda with direct, it just works fine.

from("kafka:<source endpoint details>")
            .routeId("FromKafka")
            .log("@@@@@@@@:  ${body}")
            .to("direct:mydirect");`

from("direct:mydirect")
            .routeId("sedaRoute")
            .process(myprocessor)
            .to("kafka:<destination endpoint details>"

I suspected Kafka might be a request-reply exchange and the response is given back to the source endpoint. Hence tried adding "waitForTaskToComplete=Never" to seda. But no success.

Any help will be much appreciated.

Upvotes: 0

Views: 928

Answers (2)

Sanditha Shetty
Sanditha Shetty

Reputation: 121

Incase your kafka consumer and producer topics are different. Apache camel by default keeps the consumer headers for producer as well .To avoid this, use have the update the kafka headers topic. Can we done using , bridgeEndpoint option for producer. If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. Or u can directly set headers of KafkaConstants.TOPIC for producer.

This happens only for seda routes. works perfectly fine with direct route

Upvotes: 1

timmlk
timmlk

Reputation: 11

I think you need to set the exchange pattern to "in only". Like this:

.to(ExchangePattern.InOnly,"seda:myseda")

Upvotes: 1

Related Questions