AlphaEpsilon
AlphaEpsilon

Reputation: 151

How to set port in KafkaEmbedded when unit testing spring-kafka consumer

I am using spring-boot-starter-parent version 1.5.0.RELEASE, spring-kafka version 1.0.0.RELEASE and spring-kafka-test version 1.0.0.RELEASE in an application which consumes messages from a Kakfa 0.9 cluster. I have a unit test for my consumer which used KafkaEmbedded but it fails since the broker port is picked up randomly. Is there a way I can set this broker property without changing versions? Or which versions should I use so as to not break anything?

Here is the code for the KafkaListener and KafkaConsumerTest.

Listener.java

@Service
public class Listener {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
        logger.info(msg);
        latch.countDown();
        ack.acknowledge();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

KafkaConsumerTest.java (EDIT)

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    private static String TEST_TOPIC = "topic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

    public KafkaTemplate<String, String> template;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    private Listener listener;

    @Before
    public void init(){
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
        senderProps.put("key.serializer", StringSerializer.class);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
        template = new KafkaTemplate<>(producerFactory);
        template.setDefaultTopic(TEST_TOPIC);
    }

    @Test
    public void testConsume() throws Exception {
        String record = "message";
        template.sendDefault(TEST_TOPIC, record);
        logger.debug("test-consume sent record {}", record);
        listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
        Assert.assertEquals(listener.getLatch().getCount(), 0);
    }
}

Upvotes: 2

Views: 11860

Answers (2)

Ryuzaki L
Ryuzaki L

Reputation: 40078

I think while application context is loaded for test, two beans are creating of type (ProducerFactory and KafkaTemplate), one is with the original configs and second is with test configs, try this use different profile for tests application-test.yml and add bean overriding property

spring.main.allow-bean-definition-overriding to true.

So that it will override the application beans with test beans and also declare ProducerFactory and KafkaTemplate as beans in test with same name as in application

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174729

Please use spring-kafka 1.3.9 with boot 1.5; earlier versions are no longer supported. The current boot 1.5.x version is 1.5.21.

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

static {
    embeddedKafka.setKafkaPorts(1234);
}

setKafkaPorts has been available since 1.3.

However, you are properly using the allocated random port in your test

Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

To get the kafka listener to connect to the embedded broker, you can use.

    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); 

Upvotes: 0

Related Questions