Logan
Logan

Reputation: 11

Class not found org.apache.kafka.clients.consumer.RangeAssignor in Spring XD module using Spring Kafka Integration with XML configuration

I am receiving the follow exception when I try construct a Kafka consumer in my xd module:

    Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:284) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179) ~[na:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.1.7.RELEASE.jar:4.1.7.RELEASE]
... 27 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.clients.consumer.RangeAssignor ClassNotFoundException exception occured
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:227) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) ~[na:na]
... 35 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.RangeAssignor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_77]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_77]
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_77]
at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_77]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332) ~[na:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)

Here is my xml:

<int-kafka:message-driven-channel-adapter
    id="kafkaListener" listener-container="customKafkaMessageListenerContainer"
    auto-startup="false" phase="100" send-timeout="5000" channel="toTransformer"
    error-channel="errorChannel"/>

<bean id="containerProps"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topicPartitions" ref="topicPartitionInitialOffset" />
</bean>

<bean id="topicPartitionInitialOffset"
    class="org.springframework.kafka.support.TopicPartitionInitialOffset">
    <constructor-arg index="0" name="topic" value="source-8-26"
        type="java.lang.String" />
    <constructor-arg index="1" name="partition" value="0"
        type="int" />
    <constructor-arg index="2" name="initialOffset" value="0"
        type="java.lang.Long" />
</bean>

<bean id="consumerFactory"
    class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="localhost:9092" />
            <entry key="group.id" value="test-consumer-group" />
            <entry key="autocommit.enable" value="false" />
            <entry key="value.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
            </entry>
            <entry key="key.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
            </entry>
        </map>
    </constructor-arg>
</bean>

<bean id="customKafkaMessageListenerContainer"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg name="consumerFactory" ref="consumerFactory">
    </constructor-arg>
    <constructor-arg name="containerProperties" ref="containerProps" />
</bean>

Dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.0.3.RELEASE</version>
    </dependency>
</dependencies>

Does anyone know why I am getting this class not found exception? I checked the API for Kafka 0.10.0 and the class exists, so I'm not sure why I am receiving this exception. Could it have to do with how Spring XD loads the classes? I know that Spring XD comes with Kafka 0.8.2 in its lib folder, so maybe Spring XD is loading Kafka 0.8.2 first and then its unable to find the class because of that?

Any insight would greatly be appreciated!

Upvotes: 0

Views: 1482

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

I would expect it to work ok because the module is loaded in its own classloader which should find the right classes from the module's lib folder.

I usually debug this kind of problem by running the JVM with -verbose which logs all the class loading.

Note: if you are using Kafka for the XD transport, Spring XD only supports the 0.8.x.x client - the follow-on projects (Spring Cloud Stream/Spring Cloud Dataflow) will support the newer kafka clients. The first milestone of version 1.1 supporting the new clients was announced last week.

Upvotes: 0

Related Questions