Abbin Varghese
How to write Unit test for @KafkaListener?

Trying to figure out if I can write unit test for @KafkaListener using spring-kafka and spring-kafka-test.

My Listener class.

    public class MyKafkaListener {
    private MyMessageProcessor myMessageProcessor;

    @KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
    public void myMessageListener(MyMessage message) {
        myMessageProcessor.process(message);"MyMessage processed");

My Test class :

    @EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
    @ContextConfiguration(classes = {TestKafkaConfig.class})
    public class MyMessageConsumersTest {

    private MyMessageProcessor myMessageProcessor;

    private String TOPIC_01;

    private KafkaTemplate<String, MyMessage> messageProducer;

    public void testSalesforceMessageListner() {
        MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
        messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
        verify(myMessageProcessor, times(1)).process(any(MyMessage.class));

My Test config class :

    public class TestKafkaConfig {
    public MyMessageProcessor myMessageProcessor() {
        return mock(MyMessageProcessor.class);
    public KafkaEmbedded kafkaEmbedded() {
        return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");

    public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ProducerFactory<String, MyMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    public KafkaTemplate<String, MyMessage> messageProducer() {
        return new KafkaTemplate<>(producerFactory());

Is there any simple way to make this work ?

Or should I do the testing of @KafkaListener in some other way ? In unit test, how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.

Ken Chan
In unit test, how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.

Instead of using Awaitility or CountDownLatch approach , a more easy way is to make the actual @KafkaListener bean as the mockito spy using @SpyBean. Spy basically allows you to record all interactions made on an actual bean instance such that you can verify its interactions later. Together with the timeout verify feature of the mockito , you can ensure that the verification will be done over and over until certain timeout after the producer send out the message.

Something like :

@SpringBootTest(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@EmbeddedKafka(topics = {"fooTopic"})
public class MyMessageConsumersTest {

   private MyKafkaListener myKafkaListener;

   private ArgumentCaptor<MyMessage> myMessageCaptor;

   public void test(){

      //create KafkaTemplate to send some message to the topic...

       verify(myKafkaListener, timeout(5000)). myMessageListener(myMessageCaptor.capture());
      //assert the KafkaListener is configured correctly such that it is invoked with the expected parameter

If you want to write integration tests using EmbeddedKafka, then you can do something like this. Assume we have some KafkaListener, which accepts a RequestDto as a Payload.

In your test class you should create a TestConfiguration in order to create producer beans and to autowire KafkaTemplate into your test. Also notice, that instead of autowiring consumer, we inject a consumer SpyBean.

In someTest method we are creating a latch, and setting up the consumer listener method so that when it is called, the latch will be opened and assertions will take place only after the listener have received the Payload.

Also notice any() ?: RequestDto() line. You should use elvis operator with any() only if you are using Mockito's any() with non-null Kotlin method arguments, because any() firstly returns null.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EmbeddedKafka(partitions = 10, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
class KafkaIgniteApplicationTests {

    private lateinit var consumer: Consumer

    class Config {

        private lateinit var servers: String

        fun producerConfig(): Map<String, Any> {
            val props = mutableMapOf<String, Any>()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] =
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] =
            return props

        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfig())

        fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory)

    private lateinit var kafkaTemplate: KafkaTemplate<String, String>

    fun someTest() {
        val lock = CountDownLatch(1)
        `when`(consumer.receive(any() ?: RequestDto())).thenAnswer {
        val request = "{\"value\":\"1\"}"
        kafkaTemplate.send(TOPIC, request)
        lock.await(1000, TimeUnit.MILLISECONDS)
        verify(consumer).receive(RequestDto().apply { value = BigDecimal.ONE })

Laura Liparulo
Here is my working solution for the Consumer, based on your code. Thank you :-)

The Configuration is the following:

public class KafkaTestConfig {

    private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);

    private String bootstrapServers;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);"Consumer TEST config = {}", props);
        return props;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);"Producer TEST config = {}", props);
        return props;

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<String>());

    public ProducerFactory<String, String> producerFactory() {
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
        return pf;

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;

    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();
        return kafkaListenerEndpointRegistry;

Place all the beans you need to include in the test in a different class:

public class KafkaBeansConfig {

    public MyProducer myProducer() {
        return new MyProducer();

    // more beans

I created a BaseKafkaConsumerTest class to reuse it :

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@ContextConfiguration(classes = KafkaTestConfig.class)
public class BaseKafkaConsumerTest {

    protected EmbeddedKafkaBroker embeddedKafka;

    private String brokerAddresses;

    protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    protected KafkaTemplate<String, String> senderTemplate;

    public void setUp() {
        embeddedKafka.brokerProperty("controlled.shutdown.enable", true);

        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());

    public void tearDown() {
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {

        embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
        embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());


Extend the base class to stest your consumer:

@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest {

    private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);

    private MyConsumer myConsumer;

    // mocks with @MockBean

    @ComponentScan({ "com.myfirm.kafka" })
    static class KafkaLocalTestConfig {

    public void setUp() {

    public void testMessageIsReceived() throws Exception {


    String jsonPayload = "{\"id\":\"12345\","cookieDomain\":"helloworld"}";
    ListenableFuture<SendResult<String, String>> future =
        senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        public void onSuccess(SendResult<String, String> result) {
  "successfully sent message='{}' with offset={}", jsonPayload,

        public void onFailure(Throwable ex) {
            log.error("unable to send message='{}'", jsonPayload, ex);

    Mockito.verify(myService, Mockito.times(1))

As I read in other posts, don´t test the business logic this way. Just that the calls are made.

Gary Russell
You can wrap the listener in your test case.


public class So52783066Application {

    public static void main(String[] args) {, args);

    @KafkaListener(id = "so52783066", topics = "so52783066")
    public void listen(String in) {



public class So52783066ApplicationTests {

    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");

    private KafkaListenerEndpointRegistry registry;

    private KafkaTemplate<String, String> template;

    public void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());

    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
        AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
        CountDownLatch latch = new CountDownLatch(1);
                .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {

                    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
                            Consumer<?, ?> consumer) {
                        messageListener.onMessage(data, acknowledgment, consumer);

        template.send("so52783066", "foo");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();


Artem Bilan
how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.

Well, this is essentially a Framework responsibility to test such a functionality. In your case you need just concentrate on the business logic and unit test exactly your custom code, but not that one compiled in the Framework. In addition there is not goo point to test the @KafkaListener method which just logs incoming messages. It is definitely going to be too hard to find the hook for test-case verification.

On the other hand I really believe that business logic in your @KafkaListener method is much complicated than you show. So, it might be really better to verify your custom code (e.g. DB insert, some other service call etc.) called from that method rather than try to figure out the hook exactly for the myMessageListener().

What you do with the mock(MyMessageProcessor.class) is really a good way for business logic verification. Only what is wrong in your code is about that duplication for the EmbeddedKafka: you use an annotation and you also declare a @Bean in the config. You should think about removing one of them. Although it isn't clear where is your production code, which is really free from the embedded Kafka. Otherwise, if everything is in the test scope, I don't see any problems with your consumer and producer factories configuration. You definitely have a minimal possible config for the @KafkaListener and KafkaTemplate. Only what you need is to remove a @EmbeddedKafka do not start the broker twice.

