adhikari
adhikari

Reputation: 112

Get data from topic after pushing in @EmbeddedKafka in spring boot Junit

I am writing Junit test cases(using @EmbeddedKafka) for my Spring Boot application which extensively uses Spring-kafka to communicate with other services and for other operations.

One typical case is deleting the data from kafka (which we are doing with pushing null message in kafka).

Currently in delete() method we are doing this by first checking if there exist any message in kafka that is requested to be deleted. Then we push null for that message key in Kafka

Steps followed in writing Junit for above method logic.

@Test
public void test(){
   //Push a message to Kafka (id=1234)
   //call test method service.delete(1234);
       //internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
  //check delete topic for delete message received.
  // Assertions
}

Problem here is Kafka always throws message not found exception. inside service.delete() method.

while checking logs in console. i figured out that my producer-config using a different port for kafka while consumer config using a diferent port.

I am not sure whether i have missed some minute detail or what is the reason for this behaviour. Any help will be appreciated.

Upvotes: 1

Views: 3469

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

I have this simple Spring Boot app for you to consider:

@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {

    public static final String MY_TOPIC = "myTopic";

    public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
    }

    @KafkaListener(topics = MY_TOPIC)
    public void listener(String payload) {
        this.kafkaMessages.add(payload);
    }

}

The application.properties:

spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest

And test:

@RunWith(SpringRunner.class)
@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {

    @Autowired
    private KafkaTemplate<Object, String> kafkaTemplate;

    @Autowired
    private SpringBootEmbeddedKafkaApplication kafkaApplication;

    @Test
    public void testListenerWithEmbeddedKafka() throws InterruptedException {
        String testMessage = "foo";
        this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);

        assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
    }

}

Pay attention to the spring.kafka.consumer.auto-offset-reset=earliest to let consumer to read from the beginning of the partition.

Also another important option to apply in the test is this:

@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")

The @EmbeddedKafka populates a spring.embedded.kafka.brokers system property and make Spring Boot auto-configuration to know about that we need to copy its value to the spring.kafka.bootstrapServers configuration property.

Or another option according our docs:

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}

Upvotes: 2

Related Questions