Coov Show
Coov Show

Reputation: 1

@EmbeddedKafka testing in Spring Boot

When running this test class, I get:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:543)
at com.lwo.tstlab.testproject.ControllerTest.testControllerSentMessage(ControllerTest.java:42)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184)
    ... 8 more

I assume that the problem is that I did not add the Bootstrap server in the consumer configuration. I should immediately note that I am specifically trying to create a local instance of the consumer, because my application does not need a configured @Bean consumer at the context level, it only queues messages. I also tried to embed the EmbeddedKafkaBroker bean, but I get: Could not autowire. No beans of 'EmbeddedKafkaBroker' type found.

So, my question is how to configure the consumer correctly so that I can check on @EmbeddedKafka that the message gets into the topic at the request of the controller. Here is my simple test class:

@EmbeddedKafka(topics = {"messages"})
@SpringBootTest
@AutoConfigureMockMvc
@EnableKafka
public class ControllerTest {

    @Autowired
    private MockMvc mockMvc;

    @Test
    public void testControllerSentMessage() throws Exception {
        mockMvc.perform(MockMvcRequestBuilders.get("/api/orders/kafka"))
                .andExpect(status().isCreated());
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        ConsumerRecord<String, Message> received = KafkaTestUtils.getSingleRecord(new KafkaConsumer<>(configProps), "messages");
        System.out.println(received.value().getText());
        assertEquals("test", received.value().getText());
    }
}

Upvotes: 0

Views: 36

Answers (1)

hfc
hfc

Reputation: 788

I had the same issue. You need to configure the bootstrap-servers of your consumer to ${spring.embedded.kafka.brokers}.

It should be something like this in the application-test.yml:

spring:
 ...
 kafka:
  ...
  consumer:
   bootstrap-servers: ${spring.embedded.kafka.brokers}

Upvotes: 0

Related Questions