Reputation: 3127
I'm testing my Kafka Consumer in Spring Boot. My consumer are similar to the following
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {
private final PaymentInterface paymentInterface;
@KafkaListener(topics = "#{'${kafka.topic.payment}'}",
groupId = "#{'${kafka.group-id}'}")
public void consumePaymentEvents(PaymentEvent paymentEvent) {
paymentInterface.handleReceiptPaymentReceivedEvent(paymentEvent);
}
}
And My test cases are similar to the following
@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
partitions = 1,
controlledShutdown = true)
class KafkaPaymentConsumerTest {
@Autowired
KafkaTemplate<String, PaymentEvent> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@Value("${kafka.topic.payment}")
private String paymentTopic;
@SpyBean
private KafkaPaymentConsumer kafkaPaymentConsumer;
@SpyBean
private PaymentInterface paymentInterface;
@Captor
ArgumentCaptor<PaymentEvent> paymentEventCaptor;
private static File PAYMENT_EVENT_JSON = Paths.get("src", "test", "resources", "files",
"Payment.json").toFile();
@Test
@SneakyThrows
@DirtiesContext
void consumePaymentEvents() {
PaymentEvent event = objectMapper.readValue(PAYMENT_EVENT_JSON,
PaymentEvent.class);
kafkaTemplate.send(paymentTopic, "1", event);
verify(kafkaPaymentConsumer, timeout(10000).times(1)).consumePaymentEvents(
paymentEventCaptor.capture());
PaymentEvent argument = paymentEventCaptor.getValue();
verify(paymentInterface, timeout(10000).times(1)).handleReceiptPaymentReceivedEvent(any());
}
}
the test works well, BUT when running a batch of tests at once, some tests fail ! ( only when I run many tests at the same time !! ) it seems that there is an issue in the context with @EmbeddedKafka
I got like theses log errors
Actually, there were zero interactions with this mock.
or a Timeout when trying to poll records from the broker
Any explanation or suggestion please
Upvotes: 1
Views: 1773
Reputation: 143
Adding @DirtiesContext
is required but may not be sufficient. The test should also wait for the consumer to be ready, which could be done with spring-kafka-test
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@BeforeEach
void setup( ) {
for ( MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers() ) {
ContainerTestUtils.waitForAssignment( messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic() );
}
}
Upvotes: 2
Reputation: 121272
Since you don’t use a @DirtiesContext
on your test class to close an application context in the end, it is not a surprise that other tests for the same topic can steal data from you. See if you can clean up contexts as I explained, or consider to use different topics in different tests. I’d prefer the dirties context since it guarantees that no any extra resources in the memory to cause race conditions and surprises .
Upvotes: 1