Sharafath Ahmed
Sharafath Ahmed

Reputation: 135

Camel-Kafka Zookeeper Exception

I am starting a camel-kafka consumer, where it takes all the data from topic and moves to a hashmap. The Jar version for camel-Kafka is 2.18.1

The cxfbeans.xml file is as follows:

    <route id="Test1" streamCache="true">
        <from uri="file:C:/data" />
        <split streaming="true">
            <tokenize token="\n" />
            <to uri="bean:proc1" />
            <to
                uri="kafka:localhost:9092?topic=Checking&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;serializerClass=kafka.serializer.StringEncoder" />
        </split>
    </route>

If i deploy the war i am getting the below exception:

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=Checking&zookeeperHost=localhost&zookeeperPort=2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{zookeeperHost=localhost, zookeeperPort=2181}]

I tried by removing teh ports and host of zookeeper ,it got deployed,but consumer is not consuming and placing in the processor.

Can anyone help me to resolve this issue. I degraded the version to, but i need to specify a de-serializer class in the to tag.

Upvotes: 0

Views: 1219

Answers (1)

Mobility
Mobility

Reputation: 3305

When using camel-kafka, there is a difference for version <=2.16 and >=2.17.

What you use is suitable for 2.16.

For >=2.17, see http://camel.apache.org/kafka.html 2.17 or newer part.

there is an example there:

from("direct:start").process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
                        exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                        exchange.getIn().setHeader(KafkaConstants.KEY, "1");
                    }
                }).to("kafka:localhost:9092?topic=test");

Upvotes: 1

Related Questions