dermoritz
dermoritz

Reputation: 12991

Apache Kafka: ...StringDeserializer is not an instance of ...Deserializer

In my simple application i am trying to instantiate a KafkaConsumer my code is nearly a copy of the code from javadoc ("Automatic Offset Committing"):

@Slf4j
public class MyKafkaConsumer {

    public MyKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe( Arrays.asList("mytopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                log.info( record.offset() + record.key() + record.value() );
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

If i try to instantiate this i get:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
    ...
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
        ... 48 more

How to fix this?

Upvotes: 3

Views: 7837

Answers (3)

Arun
Arun

Reputation: 1085

Your Custom class need to implement, org.apache.kafka.common.serialization.Deserializer.

like

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.Serializable;
import java.util.Map;

//Developed by Arun Singh
public class Employee implements Serializable, Serializer, **Deserializer** {

@Override
    public Object deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(bytes.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(data.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    public void close() {

    }
}

Upvotes: -1

luke
luke

Reputation: 4155

This might be the problem with Kafka classloading.
Setting classloader to null might help.

...
Thread currentThread = Thread.currentThread();    
ClassLoader savedClassLoader = currentThread.getContextClassLoader();

currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

currentThread.setContextClassLoader(savedClassLoader);
...

There is full explanation:
https://stackoverflow.com/a/50981469/1673775

Upvotes: 2

E.Sully
E.Sully

Reputation: 11

Not sure if this is what finally fixed your error, but note that when using spring-kafka-test (version 2.1.x, starting with version 2.1.5) with the 1.1.x kafka-clients jar, you will need to override certain transitive dependencies as follows:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

so it could've been a problem with your transitive dependency for sure

Upvotes: 1

Related Questions