Timothy
Timothy

Reputation: 1035

Spring Kafka @SendTo throws exception : a KafkaTemplate is required to support replies

I'm trying to get consumer result, according to Spring kafka doc.

Based on this stackoverflow question, it should be possible to do this only by using @SendTo annotation beacuse spring boot "also auto configures a kafka template if there is not one already in the context."

But I can't get it works, I still get


java.lang.IllegalStateException: a KafkaTemplate is required to support replies
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:156) 
...

This is my listener method

    @KafkaListener(topics = "t_invoice")
    @SendTo("t_ledger")
    public List<LedgerEntry> consume(Invoice invoice) throws IOException {
        // do some processing

        var ledgerCredit = new LedgerEntry(invoice.getAmount(), "Credit side", 0, "");
        var ledgerDebit = new LedgerEntry(0, "", invoice.getAmount(), "Debit side");

        return List.of(ledgerCredit, ledgerDebit);
    }

What did I miss?

This my the only @Configuration file I have on consumer. Consumer & producer is separated system (e.g. payment system produce invoice to kafka, my program is accounting system that took data and create ledger entry)

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        var properties = kafkaProperties.buildConsumerProperties();
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "600000");

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

aplication.yml

spring:
  kafka:
    consumer:
      group-id: default-spring-consumer
      auto-offset-reset: earliest

Trial-Error 1

If I disable the KafkaConfig, or enable debug during run, this error exists:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.accounting.kafkaconsumer.entity.LedgerEntry to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class com.accounting.kafkaconsumer.entity.LedgerEntry cannot be cast to class java.lang.String (com.accounting.kafkaconsumer.entity.LedgerEntry is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
...

Trial-Error 2
If I disable KafkaConfig and using this signature (returning String), it works. But this is not expected, since my configuration is on KafkaConfig

    @KafkaListener(topics = "t_invoice")
    @SendTo("t_ledger")
    public String consume(Invoice invoice) throws IOException {
        // do some processing

        var listLedger = List.of(ledgerCredit, ledgerDebit);
        return objectMapper.writeValueAsString(listLedger);
    }

I think the problem is in here (KafkaConfig), since I create new instance of KafkaListenerContainerFactory, the replyTemplate is null. How is the correct way to set up my KafkaConfig?

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Upvotes: 2

Views: 4894

Answers (2)

sikrip
sikrip

Reputation: 681

If you override Boot's kafkaListenerContainerFactory make sure that you set the reply template

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(KafkaTemplate<String, Object> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate); // <============
        return factory;
    }

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174494

If you override Boot's auto-configured container factory then it won't be... auto-configured, including applying the template. When you define your own factory, you are responsible for configuring it. It's not clear why you are overriding Boot's kafkaListenerContainerFactory bean since all you are doing is injecting the consumer factory. Just remove that @Bean and use Boot's.

Upvotes: 3

Related Questions