Reputation: 135
I am starting a camel-kafka consumer, where it takes all the data from topic and moves to a hashmap. The Jar version for camel-Kafka is 2.18.1
The cxfbeans.xml file is as follows:
<route id="Test1" streamCache="true">
<from uri="file:C:/data" />
<split streaming="true">
<tokenize token="\n" />
<to uri="bean:proc1" />
<to
uri="kafka:localhost:9092?topic=Checking&zookeeperHost=localhost&zookeeperPort=2181&serializerClass=kafka.serializer.StringEncoder" />
</split>
</route>
If i deploy the war i am getting the below exception:
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=Checking&zookeeperHost=localhost&zookeeperPort=2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{zookeeperHost=localhost, zookeeperPort=2181}]
I tried by removing teh ports and host of zookeeper ,it got deployed,but consumer is not consuming and placing in the processor.
Can anyone help me to resolve this issue. I degraded the version to, but i need to specify a de-serializer class in the to tag.
Upvotes: 0
Views: 1219
Reputation: 3305
When using camel-kafka, there is a difference for version <=2.16 and >=2.17.
What you use is suitable for 2.16.
For >=2.17, see http://camel.apache.org/kafka.html 2.17 or newer part.
there is an example there:
from("direct:start").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
exchange.getIn().setHeader(KafkaConstants.KEY, "1");
}
}).to("kafka:localhost:9092?topic=test");
Upvotes: 1