Reputation: 17
This is my producer microservice kafka configuration:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapService;
//producer factory
@Bean
public ProducerFactory<String, Object> producerFactory (){
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configMap);
}
//inviare messaggi
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
I send the messages like this:
kafkaTemplate.send(TOPIC_NAME, message);
I have no problems with the producer,
This is the consumer microservice kafka configuration:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG, "testId"
);
}
}
With the same configuration but with String istead of Object or instead of my custom object I get the following stacktrace:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
... 18 common frames omitted
Do you have any advice for having producer and consumer in different services?
[EDIT] I'm adding the listener since it has been asked:
@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){
log.info("message: {}", rcvMessage);
}
Note that the Message class is defined in both projects with the same parameters.
[Edit 2] It now works, I removed the headers in the deserializer like this:
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
Is this acceptable or is it just a work around?
Upvotes: 1
Views: 2621
Reputation: 1
We need to keep the same naming and packaging convention so that while deserializing the object will be serialized in same manner. For me I created 2 micro service named order service and notification service. when i sent order object with id and name from order micro service which is generally a producer to Kafka cluster and onn other side we have notification service which is reading l from it. I faced the same issue initially by getting this de serialization error After I created the same order object in notification service like I created in order service (Note: same package name like in order it is com.pn.orderservice.dto and it should be same in notification service as well like com.pn.orderservice.dto) It will serialize and correct message will come in consumer.
Upvotes: 0
Reputation: 94
When you send a message to kafka, a header is sent along with the message. Inside the header, by default, is the full address of the Custom Object.
For example:
com.example.kafkaproducer.model.Message
The consumer side of the message class must be created in the same path of the producer. Because validation is done when receiving the message on the consumer side, if a path other than the path is specified, the following error is received.
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
Most likely, if you change the Message class path on the consumer side, your error will be fixed.
Upvotes: 3