user12505474
user12505474

Reputation: 17

Spring boot Kafka send objects between different microservices

This is my producer microservice kafka configuration:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapService;

    //producer factory
    @Bean
    public ProducerFactory<String, Object> producerFactory (){
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configMap);
    }

    //inviare messaggi
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

I send the messages like this:

kafkaTemplate.send(TOPIC_NAME, message);

I have no problems with the producer,

This is the consumer microservice kafka configuration:

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
                ConsumerConfig.GROUP_ID_CONFIG, "testId"
        );
    }
}

With the same configuration but with String istead of Object or instead of my custom object I get the following stacktrace:

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
    ... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 18 common frames omitted

Do you have any advice for having producer and consumer in different services?

[EDIT] I'm adding the listener since it has been asked:

@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){

    log.info("message: {}", rcvMessage);

}

Note that the Message class is defined in both projects with the same parameters.

[Edit 2] It now works, I removed the headers in the deserializer like this:

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
    JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

Is this acceptable or is it just a work around?

Upvotes: 1

Views: 2621

Answers (2)

pnpavan kumar
pnpavan kumar

Reputation: 1

We need to keep the same naming and packaging convention so that while deserializing the object will be serialized in same manner. For me I created 2 micro service named order service and notification service. when i sent order object with id and name from order micro service which is generally a producer to Kafka cluster and onn other side we have notification service which is reading l from it. I faced the same issue initially by getting this de serialization error After I created the same order object in notification service like I created in order service (Note: same package name like in order it is com.pn.orderservice.dto and it should be same in notification service as well like com.pn.orderservice.dto) It will serialize and correct message will come in consumer.

Upvotes: 0

Ali Mohammadi
Ali Mohammadi

Reputation: 94

When you send a message to kafka, a header is sent along with the message. Inside the header, by default, is the full address of the Custom Object.

For example:

 com.example.kafkaproducer.model.Message

The consumer side of the message class must be created in the same path of the producer. Because validation is done when receiving the message on the consumer side, if a path other than the path is specified, the following error is received.

Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message

Most likely, if you change the Message class path on the consumer side, your error will be fixed.

Upvotes: 3

Related Questions