alexanoid
alexanoid

Reputation: 25842

Spring Kafka multiple serializers and consumer/container factories

In my Spring Boot application, I configured Kafka with the following Sender/Receiver:

@Configuration
public class KafkaSenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, ImportDecisionMessage> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, ImportDecisionMessage> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

@Configuration
public class KafkaReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, ImportDecisionMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

Based on this configuration right now I'm only able to work with my POJO - ImportDecisionMessage and use JsonDeserializer serializer for send/receive messages.

I also have to have possibility to send another POJOs as messages, for example, Product, Car, Category

Also, I'd like to use another org.apache.kafka.common.serialization.BytesSerializer for Car also.

How to properly extend my configuration in order to support these types and serializer?

Upvotes: 1

Views: 5841

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121462

That’s impossible because:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Are exactly Apache Kafka properties. And that one supports only simple plain deserialization strategy. You should consider you custom logic downstream already from the raw byte[] returned from the Kafka Consumer.

Upvotes: 2

Related Questions