user2587669
user2587669

Reputation: 602

required a bean of type 'org.springframework.kafka.core.KafkaTemplate' that could not be found

I'm using spring boot, apache kafka.

enter image description here

Below is my controller code

package com.infy.controller;

@RestController
@RequestMapping(value = "/serving/")
public class ServingRequestWebControllerImpl implements ServingRequestWebController {

    private static final Logger logger = LoggerFactory.getLogger(ServingRequestWebControllerImpl.class);

    @Autowired
    KafkaTemplate<String, TaskDetailsEntity> kafkaJsontemplate;

    String TOPIC_NAME = "sample-topic";

    @Override
    @PostMapping(value = "/produce", consumes = { "application/json" }, produces = { "application/json" })
    public String taskQueue(TaskDetailsEntity taskgDetailsEntity) {
        kafkaJsontemplate.send(TOPIC_NAME, taskgDetailsEntity);
        return "Serving Request Published Successfully To:- " + TOPIC_NAME;
    }
}

Below is the Kafka configuration code

package com.infy.config;
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    public ProducerFactory<String, ServingDetailsEntity> producerFactoryServingDetail() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateServingDetailsListener() {
        return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
    }

    @Bean
    public ConsumerFactory<String, ServingDetailsEntity> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(ServingDetailsEntity.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity> kafkaTemplateTaskDetailsListener() {
        ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Below is my Consumer code

package com.infy.consumer;
@Service
public class KafkaConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

    @Autowired
    private ServingService service;

    @KafkaListener(topics = "sample-topic", groupId = "event-group",containerFactory = "kafkaTemplateTaskDetailsListener")
    public void consumeTask(TaskDetailsEntity taskDtls){
        System.out.println("Consumed Message:- \n "+taskDtls);

        TaskDetailsEntity upsert = service.taskUpsert(taskDtls);
        System.out.println(upsert.getId());
        logger.info("\n Exit KafkaConsumerService consumeTask");
    }
}

when i run my spring boot application i'm getting below error

Description: Field kafkaJsontemplate in com.infy.controller.ServingRequestWebControllerImpl required a bean of type 'org.springframework.kafka.core.KafkaTemplate' that could not be found. The injection point has the following annotations: - @org.springframework.beans.factory.annotation.Autowired(required=true) The following candidates were found but could not be injected: - Bean method 'kafkaTemplate' in 'KafkaAutoConfiguration' not loaded because auto-configuration 'KafkaAutoConfiguration' was excluded Action: Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.KafkaTemplate' in your configuration.

i tried by adding below configuration to application.properties, but no luck

spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

below is my spring boot main code

package com.infy;

@SpringBootApplication
public class SpringBootInitializer {

    public static void main(String[] args) {

        SpringApplication.run(SpringBootInitializer.class, args);
    }
}

please someone help me, what mistake i have done.

Upvotes: 4

Views: 34683

Answers (1)

wroe
wroe

Reputation: 156

You want to auto-wire a bean of type KafkaTemplate<String, TaskDetailsEntity>, but you only declared beans of type KafkaTemplate<String, String> and KafkaTemplate<String, ServingDetailsEntity> in your KafkaConfig.

Probably also a mistake that producerFactory() has @Bean annotation and producerFactoryServingDetail() has not.

Upvotes: 14

Related Questions