Reputation: 36
KafkaConsumerConfig.java
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return props;
}
public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new KafkaErrorHandler());
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());
return factory;
}
MetadataFileCustom.Java
@KafkaListener(topics = TOPIC,
groupId = GROUP,
containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
log.info(TOPIC+ "===> RECEIVED MESSAGE:" + metadataFileIntegrationDTO);
metadataFileService.save(metadataFileIntegrationDTO);
}
if I change my consumerFactoryMetadataFileIntegration to
public ConsumerFactory consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
works, but the sonar complains..
Error: Listener failed; nested exception is java.lang.IllegalStateException: Only String, Bytes, or byte[] supported
Upvotes: 1
Views: 1198
Reputation: 1000
You can imagine the consumer flow with MessageConverter like:
Native deserializer (StringDeserializer
in your case) deserializes byte[]
messages to String
messages.
Consumer.poll() returns these String
messages.
Your MessageConverter (StringJsonMessageConverter
) converts these String
messages to your type MetadataFileIntegrationDTO
(determined by params in @KafkaListener)
So when you defined your native deserializer as JsonDeserializer
(corresponding to ConsumerFactory<String, MetadataFileIntegrationDTO>
), the consumer.poll() returned MetadataFileIntegrationDTO
messages, and that wasn't the type the StringJsonMessageConverter
can process (you could see Only String, Bytes, or byte[] supported)
And when you changed JsonDeserializer
to StringDeserializer
, the corresponding ConsumerFactory
was ConsumerFactory<String, String>
. This means that when you create a new Consumer
from this ConsumerFactory
, the consumer.poll() returns String
.
Upvotes: 2