How to Create Integration Test for Spring Kafka Listener

I have one MicroService who sends to another microservice message that should consume.

So, the kafka configs works, and everything works, but I need to create one intregration test for this code, and I have no idea how.

My KafkaConsumer.Class anothed with component anottation:

private static final Logger logger = LoggerFactory.getLogger(KafkaReactionConsumerMessageComponent.class);

private final ReactionsService reactionsService;

public KafkaReactionConsumerMessageComponent(ReactionsService reactionsService) {
    this.reactionsService = reactionsService;

   @KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
   public void consumingReactionMessages(ConsumerRecord<String, String> cr,
                                      @Payload String payload){"[JSON] received Payload: {}", payload);

    try {
        ObjectMapper mapper = new ObjectMapper();
        ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);
        if(StringUtils.equals("unloved", message.getReactionType())) {
            reactionsService.deleteReactionsByUserIdAndPostId(message.getPost().getPostId(), message.getUser().getUserId());
  "Deleted reactions from database with postId: {} and userId: {}", message.getPost().getPostId(), message.getUser().getUserId());
        } else {
            List<Reaction> reactions = creatReactions(message).stream()
  "Added reactions to database: {}", reactions);
    } catch (Exception e){
        logger.error("Cannot Deserialize payload to ReactionMessage");


My Integration Test is

private static final String TOPIC = "reaction-topic";
  private final Logger logger = LoggerFactory.getLogger(KafkaDeletePostConsumerMessageComponent.class);

  private final KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent;
  private final EmbeddedKafkaBroker embeddedKafkaBroker;

  private Consumer<String, String> consumer;

  public KafkaReactionMessageConsumerTest(KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent,
                                        EmbeddedKafkaBroker embeddedKafkaBroker) {
    this.kafkaReactionConsumerMessageComponent = kafkaReactionConsumerMessageComponent;
    this.embeddedKafkaBroker = embeddedKafkaBroker;

public void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker));
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

public void tearDown() {

public void shoudlConsumeAndInsertInDatabaseReactionDomain() {
    ReactionMessage reactionMessage = new ReactionMessage(new PostMessage("1", Set.of("a", "b", "c")),
            new UserMessage("2"),, "loved");

    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

    producer.send(new ProducerRecord<>(TOPIC, "1", reactionMessage.toString()));

    assertEquals(3, mongoTemplate.getCollection("reactions").countDocuments());

The AbstractClass:

  public abstract class AbstractMongoEmbeddedTest {

    private static MongodExecutable mongodExecutable;

    protected MongoTemplate mongoTemplate;

    private void dropPostCollection(){

Since you are using an embedded Kafka broker you could simply produce/consume the desired topic(s) from within your integration test.


Consuming can be done via a simple jUnit rule. A rule serving this purpose can be found here. Feel free to use it.

You can use it like this to assert consumed messages:



For producing messages you can simply wire a org.springframework.kafka.core.KafkaTemplate in your integration test and send messages to a given topic.

