user1906450
user1906450

Reputation: 557

Does spring always require KafkaTemplate?

Does Spring Boot always require creating a bean of type KafkaTemplate? Details/stacktrace/codebase below, please tell me if what I am doing is incorrect.

  1. I have been posting messages to a topic from a spring boot project
  2. In order to create callback mechanisms, I have used org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) in order to send the message and also create a callback
  3. The reason I did this way is because Listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
  4. However, spring fails to start up when I don't define a bean of type KafkaTemplate with the following error:
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.<init>(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
    ... 22 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
    ... 40 common frames omitted

My Kafka config is below

@Configuration
public class KafkaEventConfig {

    private final KafkaProperties kafkaProperties;

    @Value("${client.id}")
    private String clientId;


    @Value("${topic.movie.name}")
    private String movieTopicName;
    
    @Value("${retry.backoff.ms}")
    private int retryBackoffMilliseconds;

    @Value("${request.timeout.ms}")
    private int requestTimeoutMilliseconds;

    public KafkaEventConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ProducerFactory<String, Movie> producerFactory() {
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        populateCommonProperties(props);
        return new DefaultKafkaProducerFactory<>(props);
    }

    private void populateCommonProperties(Map<String, Object> props) {
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
    }
    
    @Bean
    public KafkaProducer<String, Movie> movieKafkaProducer() {
        return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
    }

    @Bean
    public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
            MeterRegistry registry) {
        return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
    }

My Kafka Callback is below

@Slf4j 
public class KafkaProducerCallBack<K, V> implements Callback {

    private ProducerRecord<K, V> producerRecord;

    public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {

        this.producerRecord = producerRecord;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        String topicName= metadata.topic();
        long offset= metadata.offset();
        
        if (exception != null) {

            log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
        }

        else {

            log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
            
        }

    }

}

I publish messages like so

movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));

Please note the moment i add the below lines in KafkaEventConfig everything works fine

@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
    return new KafkaTemplate<String, Movie>(producerFactory());
}

Upvotes: 2

Views: 10018

Answers (3)

Soumya Sahoo
Soumya Sahoo

Reputation: 1

<dependencies>
    <!-- Spring Boot Starter for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-kafka</artifactId>
    </dependency>
    
    <!-- For using SSL with Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.0</version> <!-- Make sure to use the latest version -->
    </dependency>
</dependencies>

******************************************************
package com.example.kafkaconsumer.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "https://your-kafka-broker-url:port");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // SSL Configuration
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "path/to/your/truststore.jks");
        props.put("ssl.truststore.password", "your-truststore-password");
        props.put("ssl.keystore.location", "path/to/your/keystore.jks");
        props.put("ssl.keystore.password", "your-keystore-password");
        props.put("ssl.key.password", "your-key-password");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
                consumerFactory(), 
                new ContainerProperties("your-topic")
        );
        container.setMessageListener((MessageListener<String, String>) record -> {
            System.out.println("Received: " + record.value());
        });
        return container;
    }
}

******************************************************
# Kafka properties
spring.kafka.bootstrap-servers=https://your-kafka-broker-url:port
spring.kafka.consumer.group-id=your-group-id
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# SSL properties
spring.kafka.ssl.truststore.location=classpath:truststore.jks
spring.kafka.ssl.truststore.password=your-truststore-password
spring.kafka.ssl.keystore.location=classpath:keystore.jks
spring.kafka.ssl.keystore.password=your-keystore-password
spring.kafka.ssl.key.password=your-key-password

******************************************************
******************************************************

Upvotes: 0

Linh Vu
Linh Vu

Reputation: 990

Additional to the latter that @M.Deinum mentioned:

Take a look at the KafkaAutoConfiguration class:

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

Springboot will create a KafkaTemplate bean for you if you don't create your own. This auto-configured bean depends on ProducerFactory<Object, Object> bean, and because you declared a ProducerFactory<String, Movie>. As you could see the type wasn't fit, that's why you got an error.


the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)

Your case, you can still get the advantages of using KafkaTemplate. Instead of implementing a Callback, you can implement your own ProducerListener<K, V> and bind it into your KafkaTemple. E.g:

FullLoggingProducerListener.class

public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
    @Override
    public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
        log.info("Successful!");
    }

    @Override
    public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
        log.error("Error!");
    }
}

YourConfigration.class

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        return kafkaTemplate;
    }

Now, everytime you use KafkaTemplate to send a record, you'll see the log.

Upvotes: 5

M. Deinum
M. Deinum

Reputation: 124924

Taking a closer look at your exception stacktrace reveals the issue.

Error creating bean with name 'kafkaTemplate' defined in class path resource
 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: 
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}

The error comes from not being able to apply the Kafka based auto-configuration in Spring Boot. The class KafkaAutoConfiguration expects and configures some beans, and backs off if certain ones are found. As you are configuring some of the beans, this will partially backoff and thus fail to auto-configure the Kafka classes.

To fix you can either exclude the KafkaAutoConfiguration. You can do this in your @SpringBootApplication annotation, like so

@SpringBootApplication(exclude={KafkaAutoConfiguration.class}

Or you can utilize the auto-configuration and let Spring Boot do the configuration and you use the provided KafkaTemplate or ProducerFactory to do what you want.

The latter would simplify your own configuration. I'm know too little of the Kafka auto-configuration and your usecase to provide a more helpful code snippet, but you should be able to figure that out yourself, or just exclude the KafkaAutoConfiguration and go with what you have now.

Upvotes: 2

Related Questions