Reputation: 1242
I have a simple consumer in Spring working. I have a config class defined with a bunch of factories, etc. When I remove the config class, the consumer still works. I'm wondering the benefit of having the factory, ie:
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
GenericRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
public ConsumerFactory<String, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
}
and now just passing vals in via application properties and calling it a day. I have explicit control over the config in the class-based approach, but was also thinking I could drop the class and have the vals be available through the spring env variables like spring.kafka.bootstrapservers, for example.
Upvotes: 0
Views: 4496
Reputation: 5729
the simple consumer in Spring works because spring-boot auto-configuration under the hoods creates an object of ConcurrentKafkaListenerContainerFactory
and registers it with the spring container.
You can validate it by injecting the implementation of KafkaListenerContainerFactory
as done below:
@RestController
public class EmployeeController {
private final KafkaListenerContainerFactory kafkaListenerContainerFactory;
@Autowired
public EmployeeController(KafkaListenerContainerFactory kafkaListenerContainerFactory) {
System.out.println(kafkaListenerContainerFactory instanceof ConcurrentKafkaListenerContainerFactory);
this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
}
}
But if you are not happy with spring boot's auto-generated bean, you can create your own bean and register it with the spring container by using @Bean
annotation
Upvotes: 3
Reputation: 174689
The container factory is required for @KafkaListener
methods.
Spring Boot will auto-configure one (from application.properties/yml) if you don't provide your own bean. See KafkaAutoConfiguration
.
Boot will also configure the consumer factory (if you don't).
An application, typically, does not need to declare any infrastructure beans.
EDIT
I prefer to never declare my own infrastructure beans. If I need some feature that is not exposed as a Boot property, or where I want to override some property for just one container, I simply add a customizer bean.
@Component
class Customizer {
public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.setContainerCustomizer(container -> {
if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
container.getContainerProperties().setIdleBetweenPolls(60_000);
}
});
}
}
or
@Component
class Customizer {
Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
ThreadPoolTaskExecutor exec) {
containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
}
}
etc.
Upvotes: 2