Alexxxx
Alexxxx

Reputation: 67

Apache Camel 3.21.0 Kafka Consumer max.poll.interval.ms

In my springboot project with camel-kafka I was using camel-spring-boot-starter and camel-kafka-starter dependencies. I created a consumer with the form:

from("kafka:my-topic?brokers=myBroker&consumersCount=1&groupId=myGroupId&maxPollRecords=1&maxPollIntervalMs=3600000&autoCommitEnable=false&allowManualCommit=true")

It was working until the version of Camel 3.20.6. With the newest versions 3.21.0 and 3.21.1 at the start of the application I have this Exception:

org.apache.kafka.common.config.ConfigException: Invalid value 3600000 for configuration max.poll.interval.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:688)
    at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:630)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:645)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:625)
    at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34)
    at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:245)
    at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumerTask(KafkaFetchRecords.java:205)
    at org.apache.camel.support.task.ForegroundTask.run(ForegroundTask.java:93)
    at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:127)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Looking for the Exception it seems to be a problem not directly related to Camel, but I don't know how to solve it. This is the related problem in kafka-client: https://github.com/logstash-plugins/logstash-input-kafka/issues/199. It seems to be a configuration in

org/apache/kafka/common/config/ConfigDef.java
public static Object parseType(String name, Object value, Type type) {
        ...
                case INT:
                    if (value instanceof Integer) {
                        return value;
                    } else if (value instanceof String) {
                        return Integer.parseInt(trimmed);
                    } else {
                        throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName());
                    }
        ...
    }

but maybe is Camel that is parsing it to a Long instead of an Integer.

I tried to pass a smaller value, like 360, but the problem is still the same. If I try to pass a String to bypass the problem, like "3600000" it reads the value like 0.

Does anyone solved this problem?

Upvotes: 0

Views: 761

Answers (1)

Gatusko
Gatusko

Reputation: 2608

Well I face the same issue as you when I upgraded my Camel to 3.21.0 and then move to 3.21.1 and still having the same issue

In short answer there is no a fix from Camel side.

On KafkaConfiguration of Camel https://github.com/apache/camel/blob/camel-3.21.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java

private Long maxPollIntervalMs;
@UriParam(
    label = "consumer",
    defaultValue = "latest",
    enums = "latest,earliest,none"
)

It is Long and Kafka is expecting Integer for max.poll.interval.ms :

            case INT:
                if (value instanceof Integer) {
                    return value;
                } else {
                    if (value instanceof String) {
                        return Integer.parseInt(trimmed);
                    }

                    throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName());

So for to make this work it is just fork and change that parameter to use Integer instead of Long if you need that parameter.

Upvotes: 1

Related Questions