Reputation: 67
In my springboot project with camel-kafka I was using camel-spring-boot-starter and camel-kafka-starter dependencies. I created a consumer with the form:
from("kafka:my-topic?brokers=myBroker&consumersCount=1&groupId=myGroupId&maxPollRecords=1&maxPollIntervalMs=3600000&autoCommitEnable=false&allowManualCommit=true")
It was working until the version of Camel 3.20.6. With the newest versions 3.21.0 and 3.21.1 at the start of the application I have this Exception:
org.apache.kafka.common.config.ConfigException: Invalid value 3600000 for configuration max.poll.interval.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:688)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:630)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:645)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:625)
at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34)
at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:245)
at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumerTask(KafkaFetchRecords.java:205)
at org.apache.camel.support.task.ForegroundTask.run(ForegroundTask.java:93)
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:127)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Looking for the Exception it seems to be a problem not directly related to Camel, but I don't know how to solve it. This is the related problem in kafka-client: https://github.com/logstash-plugins/logstash-input-kafka/issues/199. It seems to be a configuration in
org/apache/kafka/common/config/ConfigDef.java
public static Object parseType(String name, Object value, Type type) {
...
case INT:
if (value instanceof Integer) {
return value;
} else if (value instanceof String) {
return Integer.parseInt(trimmed);
} else {
throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName());
}
...
}
but maybe is Camel that is parsing it to a Long instead of an Integer.
I tried to pass a smaller value, like 360, but the problem is still the same. If I try to pass a String to bypass the problem, like "3600000" it reads the value like 0.
Does anyone solved this problem?
Upvotes: 0
Views: 761
Reputation: 2608
Well I face the same issue as you when I upgraded my Camel to 3.21.0 and then move to 3.21.1 and still having the same issue
In short answer there is no a fix from Camel side.
On KafkaConfiguration of Camel https://github.com/apache/camel/blob/camel-3.21.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
private Long maxPollIntervalMs;
@UriParam(
label = "consumer",
defaultValue = "latest",
enums = "latest,earliest,none"
)
It is Long and Kafka is expecting Integer for max.poll.interval.ms :
case INT:
if (value instanceof Integer) {
return value;
} else {
if (value instanceof String) {
return Integer.parseInt(trimmed);
}
throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName());
So for to make this work it is just fork and change that parameter to use Integer instead of Long if you need that parameter.
Upvotes: 1