Reputation: 112
I am writing Junit test cases(using @EmbeddedKafka) for my Spring Boot application which extensively uses Spring-kafka to communicate with other services and for other operations.
One typical case is deleting the data from kafka (which we are doing with pushing null message in kafka).
Currently in delete() method we are doing this by first checking if there exist any message in kafka that is requested to be deleted. Then we push null for that message key in Kafka
Steps followed in writing Junit for above method logic.
@Test
public void test(){
//Push a message to Kafka (id=1234)
//call test method service.delete(1234);
//internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
//check delete topic for delete message received.
// Assertions
}
Problem here is Kafka always throws message not found exception. inside service.delete() method.
while checking logs in console. i figured out that my producer-config using a different port for kafka while consumer config using a diferent port.
I am not sure whether i have missed some minute detail or what is the reason for this behaviour. Any help will be appreciated.
Upvotes: 1
Views: 3469
Reputation: 121552
I have this simple Spring Boot app for you to consider:
@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {
public static final String MY_TOPIC = "myTopic";
public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();
public static void main(String[] args) {
SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
}
@KafkaListener(topics = MY_TOPIC)
public void listener(String payload) {
this.kafkaMessages.add(payload);
}
}
The application.properties
:
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
And test:
@RunWith(SpringRunner.class)
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {
@Autowired
private KafkaTemplate<Object, String> kafkaTemplate;
@Autowired
private SpringBootEmbeddedKafkaApplication kafkaApplication;
@Test
public void testListenerWithEmbeddedKafka() throws InterruptedException {
String testMessage = "foo";
this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);
assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
}
}
Pay attention to the spring.kafka.consumer.auto-offset-reset=earliest
to let consumer to read from the beginning of the partition.
Also another important option to apply in the test is this:
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
The @EmbeddedKafka
populates a spring.embedded.kafka.brokers
system property and make Spring Boot auto-configuration to know about that we need to copy its value to the spring.kafka.bootstrapServers
configuration property.
Or another option according our docs:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Upvotes: 2