Reputation: 1698
I have implemented a simple consumer application to consume messages from the topics. When I run the kafka-consumer application then the following error has occurred.
StackTrace
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
Config class
@Configuration
@EnableKafka
public class KafkaConfig {
private ConsumerFactory<String,String> consumerFactory()
{
Map<String,Object> config=new ConcurrentHashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String,String> factory
=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Listener class
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
public void consume(String message)
{
System.out.println("Consumed Message "+message);
}
}
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
Note - My Kafka version is 2.13-2.6.0
Upvotes: 2
Views: 6068
Reputation: 57204
You are using a StringSerializer
but should use a StringDeserializer
, one serializes, the other deserializes.
And since you set them for ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
you apparently want to deserialize.
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Upvotes: 4