christophetd
christophetd

Reputation: 3874

spring-kafka: Kafka consumer isn't receiving records if I define a record producer in my application

I'm writing a small PoC using Spring and Spring Kafka. My goal is to have both a producer and a consumer, writing to (resp. reading from) from this topic.

I'm having a weird situation where:

Below is my code - it is very similar to the examples of the documentation. More precisely, the problem is coming from the fact that the beans in KafkaConsumerConfiguration are not being created by Spring (i.e. the methods constructing them are never called).

Producer

KafkaProducerConfiguration.java

@Configuration
public class KafkaProducerConfiguration {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

MessageSender.java

@Component
public class MessageSender {
    final static private Logger log = Logger.getLogger(MessageSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostConstruct
    public void onConstruct() throws InterruptedException {
        log.info("Sending messages...");
        for (int i = 0; i < 100; ++i) {
            kafkaTemplate.send("mytopic", "this is a message");
            Thread.sleep(1000);
        }
        kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
        log.info("Done sending messages");
    }
}

Consumer

KafkaConsumerConfiguration.java

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
        return props;
    }
}

MyMessageListener.java

@Service
public class MyMessageListener {
    final static private Logger log = Logger.getLogger(MyMessageListener.class);

    @PostConstruct
    public void onConstruct() {
        log.info("Message listener started");
    }

    @KafkaListener(topics = "mytopic")
    public void onMessageReceived(String message) {
        log.info("Got message: "+ message);
    }
}

Here's the log generated by the application, for reference: https://pastebin.com/BY783jiL. As you can see, the consumer beans are not being created (otherwise there would be a block ConsumerConfig values: ....

Here are a few things I tried without success:

Versions: Spring Boot 1.5.9, Spring-Kafka: 1.1.7.

I've been tearing my hair out for a few hours now, any help appreciated.

Thanks!

Upvotes: 0

Views: 939

Answers (2)

Gary Russell
Gary Russell

Reputation: 174729

kafkaTemplate.send("mytopic", "this is a message");

You should NEVER start interacting with external services in a @PostConstruct method - you need to wait for the application to be built before doing so.

Implement SmartLifecyle, return true for isAutoStartup and move that code to start().

Or implement ApplicationListener<ConstextRefreshedEvent> and do the sends when you get the event.

Either way will ensure that the application is ready.

Upvotes: 2

christophetd
christophetd

Reputation: 3874

Just found the issue. MessageSender.onConstruct is actually taking a lot of time to execute (100 seconds), and during the meantime it is preventing Spring from creating the other beans.

Upvotes: 0

Related Questions