Reputation: 53
I have a kafka consumer java application. I am reading two separate kafka topics from that application.
For topic 1, there is a KafkaListenerContainerFactory with name kafkaListenerContainerFactory, below is the code snippet. The message is in avro format. Pojo1 is the pojo class built using avro schema.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
For consuming message from topic1, we have below method
@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
Now this application is reading messages form topic2 too which is also having avro messages and Pojo2 is the pojo class built using the respective avro schema.
To read from topic2, we have below method
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
Now I am not getting how consuming messages from topic2 is working as expected.
kafkaListenerContainerFactory was configured with Pojo1 so how does messages are being read from topic2 for Pojo2. As far as I know kafka creates default container factory only if there is a missing bean with name "kafkaListenerContainerFactory", but in my application we already have built one kafkaListenerContainerFactory for topic1.
According there should be another KafkaListenerContainerFactory created for Pojo2 and should be given the reference when using @kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
Can someone help me understand how KafkaListenerContainerFactory is working for different topics with different message schema's.
Upvotes: 5
Views: 6752
Reputation: 51
This works only in your case because the deserializer is able to deserialize String to both objects. But imagine you use Avro and you still have both Pojo1 and Pojo2. In this case you would need two container factories. If you do not provide one for Pojo2 an exception will be thrown - MessageConversionException: cannot convert String to Pojo2 for GenericMessage
. The solution would be to define @kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
Upvotes: 1
Reputation: 174484
It's called type erasure; the generic types are only applicable at compile time. At runtime, as long as your deserializer creates the correct type it will work.
It's probably cleaner to use ConcurrentKafkaListenerContainerFactory<String, Object>
to avoid any confusion; especially when using an avro or json deserializer that can create different types.
That said, your bean is called importDecisionMessageKafkaListenerContainerFactory
. For listeners to use a non-standard factory, it's name must be specified in the containerFactory
property on the listener; otherwise kafkaListenerContainerFactory
is used.
Upvotes: 7