MFEB
MFEB

Reputation: 151

Java Spring Kafka: multiple listeners

Currently trying to get two different listeners in the same project to consume. Following various other posts all yield different errors, but overall it seems that the beans are not properly defined.

KafkaConsumerConfig(PipelineMessage)



@EnableKafka
@Configuration
public class KafkaConsumerConfigPipelineMessage {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.producer.group-id}")
    private String groupId;

    @Value(value = "${saslMechanism}")
    private String saslMechanism;

    @Value(value = "${kafkaUser}")
    private String kafkaUser;

    @Value(value = "${kafkaPassword}")
    private String kafkaPassword;

    @Value(value = "${securityProtocol}")
    private String securityProtocol;
    @Bean
    public ConsumerFactory<String, PipelineMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
        props.put("sasl.mechanism",saslMechanism);
        props.put("security.protocol",securityProtocol);
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username ='"+kafkaUser+"' password = '"+kafkaPassword+"';");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new AvroDeserializer<>(PipelineMessage.class));
    }

    @Bean("kafkaListenerPipelineMessage")
    public ConcurrentKafkaListenerContainerFactory<String, PipelineMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, PipelineMessage> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

KafkaConsumerConfig(Bridge)

@EnableKafka
@Configuration
public class KafkaConsumerConfigBridge {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.producer.group-id}")
    private String groupId;

    @Value(value = "${saslMechanism}")
    private String saslMechanism;

    @Value(value = "${kafkaUser}")
    private String kafkaUser;

    @Value(value = "${kafkaPassword}")
    private String kafkaPassword;

    @Value(value = "${securityProtocol}")
    private String securityProtocol;

    @Bean
    public ConsumerFactory<String, Bridge> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
        props.put("sasl.mechanism",saslMechanism);
        props.put("security.protocol",securityProtocol);
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username ='"+kafkaUser+"' password = '"+kafkaPassword+"';");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new AvroDeserializer<>(Bridge.class));
    }

    @Bean("kafkaListenerBridge")
    public ConcurrentKafkaListenerContainerFactory<String, Bridge> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Bridge> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Listeners

 @KafkaListener(id = "id1",topics = "${kafka.topic.consumer.name.rb}", groupId = "${spring.kafka.producer.group-id}", containerFactory = "kafkaListenerBridge")
    public void consumeBridgeForPayroll(Bridge message) throws IOException {

@KafkaListener(id = "id2",topics = "${kafka.topic.consumer.name.dlq}",groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerPipelineMessage")
    public void consumeForMessages(PipelineMessage message) throws 

I tried moving the Bean name to the consumerFactory function but that did not help. With the current implementation the error indicates that the listener bean is of the wrong type.

After changing the names of the consumerFactory the following error occurred.

"timestamp":"2021-09-27T22:02:11.273Z","level":"ERROR","thread":"main","logger":"org.springframework.boot.SpringApplication","message":"Application run failed","context":"default","exception":"org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaListenerContainerFactory' parameter 1; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:769)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:509)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:847)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
at com.employbridge.core.timecollectionservice.Application.main(Application.java:15)
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1662)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1221)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1175)
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:857)
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:760)
... 19 common frames omitted
"

Upvotes: 0

Views: 2447

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

You have two beans with the same name:

@Bean
public ConsumerFactory<String, PipelineMessage> consumerFactory() {

@Bean
public ConsumerFactory<String, Bridge> consumerFactory() {

Which override each other. Well, one of them will win in the end.

Even if you think you call a consumerFactory() method in the same class, it still delegates to the BeanFactory to return you a bean. And that might come to you from different definition. The best way for you to overcome the problem is to have those ConsumerFactory in different names. There is just enough to change the method name: it is used by default for bean name definitions.

Upvotes: 2

Related Questions