Reputation: 3254
I was trying to test camel integration with kafka as explained here
Following is my code
public class KafkaTest {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
from("kafka:test?zkConnect=localhost:2181&metadataBrokerList=localhost:9092")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody());
}
})
.end();
}
});
context.start();
while (true) {
}
}
}
However, I am getting the following error
Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka:test?zkConnect=localhost:2181&... because of Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint.
Unknown parameters=[{metadataBrokerList=localhost:9092, zkConnect=localhost:2181}]
Please suggest what could be missing.
Upvotes: 2
Views: 5807
Reputation: 2417
Use endpoint class ?
something like:
public static KafkaEndpoint endpoint(String host, String port, String topic, String offset, String groupId) {
String endpointUri = "kafka://" + host + ":" + port;
KafkaEndpoint endpoint = new DefaultCamelContext().getEndpoint(endpointUri, KafkaEndpoint.class);
endpoint.getConfiguration().setTopic(topic);
endpoint.getConfiguration().setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
endpoint.getConfiguration().setValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
endpoint.getConfiguration().setAutoOffsetReset(offset);
endpoint.getConfiguration().setGroupId(groupId);
return endpoint;
}
PollingConsumer consumer = endpoint.createPollingConsumer();
or
new RouteBuilder() {
public void configure() {
from(endpoint)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody());
}
})
.end();
}
}
Upvotes: 0
Reputation: 22279
You should use the correct parameter names named in the official documentation.
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181")
The version you are refering to, described in the wiki at github, was contributed to Apache and changed somewhat since.
Upvotes: 1