Reputation: 25842
I have the following Kafka Spring consumer configuration:
@Configuration
public class KafkaConsumerTestConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, ImportDecisionMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
So far everything works well.
Now, I'd like another type of message, let's say Review
. I change my configuration to the following one:
@Configuration
public class KafkaConsumerTestConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, ImportDecisionMessage> importDecisionMessageConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> importDecisionMessageKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(importDecisionMessageConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Review> reviewConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Review.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Review> reviewKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Review> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(reviewConsumerFactory());
return factory;
}
}
and my Spring Boot application fails with the following exception:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type'org.springframework.kafka.core.ConsumerFactory'importDecisionMessageConsumerFactory
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
2017-11-23 18:19:50.578 ERROR 14264 --- [ main] o.s.test.context.TestContextManager : Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@6989da5e] to prepare test instance [com.decisionwanted.domain.DecisionCharacteristicIT@535a6697]
java.lang.IllegalStateException: Failed to load ApplicationContext
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:107) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:242) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na]
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaListenerContainerFactory' parameter 1; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:723) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:458) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1249) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1098) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:502) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:312) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:310) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:751) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:387) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:138) ~[spring-boot-test-2.0.0.M6.jar:2.0.0.M6]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
... 25 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1502) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1099) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1060) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:809) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:715) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
... 43 common frames omitted
What am I doing wrong and how to fix it ? Why do I need to configure this default kafkaListenerContainerFactory
? Is it possible to avoid it?
Upvotes: 3
Views: 10046
Reputation: 121462
You can exclude KafkaAutoConfiguration
from you Spring Boot configuration. Or call one of the KafkaListenerContainerFactory
with the proper bean name to satisfy the condition:
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
Upvotes: 4