Reputation: 557
Does Spring Boot always require creating a bean of type KafkaTemplate
?
Details/stacktrace/codebase below, please tell me if what I am doing is incorrect.
org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V>, Callback)
in order to send the message and also create a callbackListenablefuture
when using KafkaTemplate
only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)KafkaTemplate
with the following error:Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.<init>(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
... 22 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
... 40 common frames omitted
My Kafka config is below
@Configuration
public class KafkaEventConfig {
private final KafkaProperties kafkaProperties;
@Value("${client.id}")
private String clientId;
@Value("${topic.movie.name}")
private String movieTopicName;
@Value("${retry.backoff.ms}")
private int retryBackoffMilliseconds;
@Value("${request.timeout.ms}")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, Movie> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
}
private void populateCommonProperties(Map<String, Object> props) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
}
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer() {
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
}
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry) {
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
}
My Kafka Callback is below
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback {
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null) {
log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
}
else {
log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
}
}
}
I publish messages like so
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
Please note the moment i add the below lines in KafkaEventConfig
everything works fine
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
return new KafkaTemplate<String, Movie>(producerFactory());
}
Upvotes: 2
Views: 10018
Reputation: 1
<dependencies>
<!-- Spring Boot Starter for Apache Kafka -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-kafka</artifactId>
</dependency>
<!-- For using SSL with Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version> <!-- Make sure to use the latest version -->
</dependency>
</dependencies>
******************************************************
package com.example.kafkaconsumer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "https://your-kafka-broker-url:port");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// SSL Configuration
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "path/to/your/truststore.jks");
props.put("ssl.truststore.password", "your-truststore-password");
props.put("ssl.keystore.location", "path/to/your/keystore.jks");
props.put("ssl.keystore.password", "your-keystore-password");
props.put("ssl.key.password", "your-key-password");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(),
new ContainerProperties("your-topic")
);
container.setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received: " + record.value());
});
return container;
}
}
******************************************************
# Kafka properties
spring.kafka.bootstrap-servers=https://your-kafka-broker-url:port
spring.kafka.consumer.group-id=your-group-id
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# SSL properties
spring.kafka.ssl.truststore.location=classpath:truststore.jks
spring.kafka.ssl.truststore.password=your-truststore-password
spring.kafka.ssl.keystore.location=classpath:keystore.jks
spring.kafka.ssl.keystore.password=your-keystore-password
spring.kafka.ssl.key.password=your-key-password
******************************************************
******************************************************
Upvotes: 0
Reputation: 990
Additional to the latter that @M.Deinum mentioned:
Take a look at the KafkaAutoConfiguration
class:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
Springboot will create a KafkaTemplate
bean for you if you don't create your own. This auto-configured bean depends on ProducerFactory<Object, Object>
bean, and because you declared a ProducerFactory<String, Movie>
. As you could see the type wasn't fit, that's why you got an error.
the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
Your case, you can still get the advantages of using KafkaTemplate
. Instead of implementing a Callback
, you can implement your own ProducerListener<K, V>
and bind it into your KafkaTemple
. E.g:
FullLoggingProducerListener.class
public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
log.info("Successful!");
}
@Override
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
log.error("Error!");
}
}
YourConfigration.class
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
Now, everytime you use KafkaTemplate
to send a record, you'll see the log.
Upvotes: 5
Reputation: 124924
Taking a closer look at your exception stacktrace reveals the issue.
Error creating bean with name 'kafkaTemplate' defined in class path resource
[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]:
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
The error comes from not being able to apply the Kafka based auto-configuration in Spring Boot. The class KafkaAutoConfiguration
expects and configures some beans, and backs off if certain ones are found. As you are configuring some of the beans, this will partially backoff and thus fail to auto-configure the Kafka classes.
To fix you can either exclude the KafkaAutoConfiguration
. You can do this in your @SpringBootApplication
annotation, like so
@SpringBootApplication(exclude={KafkaAutoConfiguration.class}
Or you can utilize the auto-configuration and let Spring Boot do the configuration and you use the provided KafkaTemplate
or ProducerFactory
to do what you want.
The latter would simplify your own configuration. I'm know too little of the Kafka auto-configuration and your usecase to provide a more helpful code snippet, but you should be able to figure that out yourself, or just exclude the KafkaAutoConfiguration
and go with what you have now.
Upvotes: 2