Columb1a
Columb1a

Reputation: 491

SpringXD Error when deploying Stream: StringDeserializer class could not be found

I'm configuring my custom kafka source and I'm getting an error related to the value.deserializer kafka property.

This is the config that I have:

<!--Consumer -->
<bean id="container1"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="group.id" value="bridge-stream-testing" />
                    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>

I see org.apache.kafka.common.serialization.StringDeserializer class indeed (I can click the class name and it takes me to the jar file.

Just in case this is the content in my pom file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.xd</groupId>
        <artifactId>spring-xd-module-parent</artifactId>
        <version>1.3.2.RELEASE</version>
    </parent>
    <groupId>ejemplos.spring</groupId>
    <artifactId>kafka-source-latest-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.0.RELEASE</version>
        </dependency>


    </dependencies>

    <repositories>
        <repository>
            <id>pentaho-releases</id>
            <url>http://repository.pentaho.org/artifactory/repo/</url>
        </repository>
    </repositories>
</project>

These are the logs that I'm getting: https://gist.github.com/columb1a/2833f1ac751436df1caa730ce1a0eb37

Upvotes: 0

Views: 264

Answers (2)

Columb1a
Columb1a

Reputation: 491

I resolved my problem as follows: After having replaced the kafka-clients PLUS having added provided scope to the kafka-clients dependency in my pom file, my consumer is working properly (so far).

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121507

Try to place kafka-clients jar into the shared /lib instead of module classpath. I just guess, but looks like that class is tried to be load by the Thread.currentThread().getContextClassLoader(); instead of module-specific.

Upvotes: 1

Related Questions