AJK1305
AJK1305

Reputation: 129

Spring-Kafka @KafkaHandlers not consuming respective java Objects

I know it has been asked multiple time, however, i cant find the solution yet. I am able to consume a specific Java Object by my @KafkaListener(On a class level) and works perfectly fine, however, When i try to consume multiple different JSON Objects from the same topic( I use @KafkaListener at class level and @KafkaHandler at the method level, each @KafkaHandler method expecting different Object), it always produces LinkedHashMap. I can parse this and get the data and do a factory pattern to generate different Instances based on a json field, however, i do not want to do that when Spring can auto detect a specific @KafkaHandler for routing the message. How do i consume different JSON object from a single topic with single @KafkaListener.

I am using below configs:

public ConsumerFactory<String, Object> abcConsumerFactory() {       
    Map<String, Object> config = new HashMap<>();       
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false)); 

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

If i use the actual class(Foo or Bar) instead of Object in above configs it works fine for that particular objects, However, when i try to generalize, it does not go to the specific @KafkaHandler, and instead goes to a @KafkaHandler with Payload type LinkedHashMap(I was trying to check if it gets delivered to a @KafkaHandler)

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

Class:  

    @KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
    @Service
    public class MyListener {

    @KafkaHandler
    public void consumeMessage(@Payload Foo f) {
        //only comes here when i use Foo in my configs instead of Object

    }

    @KafkaHandler
    public void consumeMessage22(@Payload Bar b) {
        //only comes here when i use Bar in my configs instead of Object

    }   

    @KafkaHandler
    public void consumeMessage77(@Payload LinkedHashMap bc) {

        //comes here when i use Object in the configs, even if i expect a Foo or a Bar object

    }               
    }

One thing i want to share is the Producer is not using Spring-Kafka.

I dont know what i am missing, i tried a lot of things but no luck.

Upvotes: 2

Views: 4337

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

As the documentation says:

When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

In order to route to the proper method, the ConsumerRecord must already have the correct type in it.

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );

You are not providing the deserializer with any information about what object to create.

When Spring is the producer, it adds type information in record headers which can be used by the deserializer to construct the right type.

If there is no type information, you'll get a Map.

The producer has to set the type headers for this to work.

When the @KaflaListener is at the method level, we can determine the type to create from the method paramters. At the class level, we have a catch-22 - we can't choose a method unless the record already has been converted.

Your producer doesn't have to know the actual type, but it at least has to provide a header that can be used to look up the type we need to convert to.

See Mapping Types.

Alternatively, the producer's JSON serializer must be configured to add type information into the JSON itself.

Another option is a custom deserializer that "peeks" at the JSON to determine what class to instantiate.

Upvotes: 2

Related Questions