Reputation: 1
When running this test class, I get:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:543)
at com.lwo.tstlab.testproject.ControllerTest.testControllerSentMessage(ControllerTest.java:42)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184)
... 8 more
I assume that the problem is that I did not add the Bootstrap server in the consumer configuration. I should immediately note that I am specifically trying to create a local instance of the consumer, because my application does not need a configured @Bean
consumer at the context level, it only queues messages. I also tried to embed the EmbeddedKafkaBroker
bean, but I get: Could not autowire. No beans of 'EmbeddedKafkaBroker' type found.
So, my question is how to configure the consumer correctly so that I can check on @EmbeddedKafka
that the message gets into the topic at the request of the controller. Here is my simple test class:
@EmbeddedKafka(topics = {"messages"})
@SpringBootTest
@AutoConfigureMockMvc
@EnableKafka
public class ControllerTest {
@Autowired
private MockMvc mockMvc;
@Test
public void testControllerSentMessage() throws Exception {
mockMvc.perform(MockMvcRequestBuilders.get("/api/orders/kafka"))
.andExpect(status().isCreated());
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
ConsumerRecord<String, Message> received = KafkaTestUtils.getSingleRecord(new KafkaConsumer<>(configProps), "messages");
System.out.println(received.value().getText());
assertEquals("test", received.value().getText());
}
}
Upvotes: 0
Views: 36
Reputation: 788
I had the same issue. You need to configure the bootstrap-servers
of your consumer to ${spring.embedded.kafka.brokers}
.
It should be something like this in the application-test.yml
:
spring:
...
kafka:
...
consumer:
bootstrap-servers: ${spring.embedded.kafka.brokers}
Upvotes: 0