Reputation: 4719
I have a working prototype Spring Boot application which listens on a Kafka queue. Apart from the configuration in application.yml
, all that is required is a MessageListener
implementation annotated with @KafkaListener
.
Am now introducing Spring Integration, and to do so have configured these beans:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = ...; // set proerties
return new DefaultKafkaConsumerFactory<>(props);
}
The application is not starting, and is throwing this error:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
This is even though I have defined a ConsumerFactory
bean.
Running in debug mode, it is apparent that Boot is loading a KafkaListenerEndpointContainer
bean to listen on the broker:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator localhost:9092 (id: 2147483999 rack: null) for group my_group.
Then:
KafkaAnnotationDrivenConfiguration matched:
- @ConditionalOnClass found required class 'org.springframework.kafka.annotation.EnableKafka'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)
KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory matched:
- @ConditionalOnMissingBean (names: kafkaListenerContainerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactoryConfigurer matched:
- @ConditionalOnMissingBean (types: org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration matched:
- @ConditionalOnMissingBean (names: org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration matched:
- @ConditionalOnClass found required class 'org.springframework.kafka.core.KafkaTemplate'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)
KafkaAutoConfiguration#kafkaProducerFactory matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.core.ProducerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration#kafkaProducerListener matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.support.ProducerListener; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration#kafkaTemplate matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.core.KafkaTemplate; SearchStrategy: all) did not find any beans (OnBeanCondition)
I think what is happening is that the Spring Boot\Kafa auto configuration is clashing with the Spring Integration\Kafka setup. What is the correct way to resolve this?
Thanks
Upvotes: 4
Views: 10487
Reputation: 174789
You can either use Boot's Consumer factory...
@Bean
public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
...
}
Or disable kafka auto configuration
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
Upvotes: 9