Reputation: 151
I am using spring-boot-starter-parent
version 1.5.0.RELEASE
, spring-kafka
version 1.0.0.RELEASE
and spring-kafka-test
version 1.0.0.RELEASE
in an application which consumes messages from a Kakfa 0.9
cluster. I have a unit test for my consumer which used KafkaEmbedded
but it fails since the broker port is picked up randomly. Is there a way I can set this broker property without changing versions? Or which versions should I use so as to not break anything?
Here is the code for the KafkaListener
and KafkaConsumerTest
.
@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate<String, String> template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}
Upvotes: 2
Views: 11860
Reputation: 40078
I think while application context is loaded for test, two beans are creating of type (ProducerFactory and KafkaTemplate)
, one is with the original configs and second is with test configs, try this use different profile for tests application-test.yml
and add bean overriding property
spring.main.allow-bean-definition-overriding to true.
So that it will override the application beans with test beans and also declare ProducerFactory
and KafkaTemplate
as beans in test with same name as in application
Upvotes: 0
Reputation: 174729
Please use spring-kafka 1.3.9 with boot 1.5; earlier versions are no longer supported. The current boot 1.5.x version is 1.5.21.
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
static {
embeddedKafka.setKafkaPorts(1234);
}
setKafkaPorts
has been available since 1.3.
However, you are properly using the allocated random port in your test
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
To get the kafka listener to connect to the embedded broker, you can use.
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Upvotes: 0