Reputation: 25842
In my Spring Boot application, I configured Kafka with the following Sender/Receiver:
@Configuration
public class KafkaSenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, ImportDecisionMessage> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, ImportDecisionMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, ImportDecisionMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Based on this configuration right now I'm only able to work with my POJO - ImportDecisionMessage
and use JsonDeserializer
serializer for send/receive messages.
I also have to have possibility to send another POJOs as messages, for example, Product
, Car
, Category
Also, I'd like to use another org.apache.kafka.common.serialization.BytesSerializer
for Car
also.
How to properly extend my configuration in order to support these types and serializer?
Upvotes: 1
Views: 5841
Reputation: 121462
That’s impossible because:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Are exactly Apache Kafka properties. And that one supports only simple plain deserialization strategy. You should consider you custom logic downstream already from the raw byte[]
returned from the Kafka Consumer.
Upvotes: 2