Reputation: 825
I have a method for sending kafka message like this:
@Async
public void sendMessage(String topicName, Message message) {
ListenableFuture<SendResult<String, Message >> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, Message > result) {
//do nothing
}
@Override
public void onFailure(Throwable ex) {
log.error("something wrong happened"!);
}
});
}
And now I am writing unit tests for it. I would like to test also the two callback methods onSuccess
and onFailure
methods, so my I idea is to mock the KafkaTemplate, something like :
KafkaTemplate kafkaTemplate = Mockito.mock(KafkaTemplate.class);
But now I am getting stuck on the mocking result for these two cases:
when(kafkaTemplate.send(anyString(), any(Message.class))).thenReturn(????);
what should I put in the thenReturn
method for the case success and for the case failure? Does anyone have an idea please? Thank you very much!
Upvotes: 17
Views: 23409
Reputation: 174799
You can mock the template but it's better to mock the interface.
MySender sender = new MySender();
KafkaOperations template = mock(KafkaOperations.class);
SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();
when(template.send(anyString(), any(Message.class))).thenReturn(future);
sender.setTemplate(template);
sender.send(...);
future.set(new SendResult<>(...));
...or...
future.setException(...
EDIT
Updated to CompletableFuture
(Spring for Apache Kafka 3.0.x and later)...
public class MySender {
private KafkaOperations<String, String> template;
public void setTemplate(KafkaOperations<String, String> template) {
this.template = template;
}
public void send(String topic, Message<?> data) {
CompletableFuture<SendResult<String, String>> future = this.template.send(data);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println(result);
}
else {
System.out.println(ex.getClass().getSimpleName() + "(" + ex.getMessage() + ")");
}
});
}
}
@ExtendWith(OutputCaptureExtension.class)
public class So57475464ApplicationTests {
@Test
public void test(CapturedOutput captureOutput) {
Message message = new GenericMessage<>("foo");
MySender sender = new MySender();
KafkaOperations template = mock(KafkaOperations.class);
CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();
given(template.send(any(Message.class))).willReturn(future);
sender.setTemplate(template);
sender.send("foo", message);
future.completeExceptionally(new RuntimeException("foo"));
assertThat(captureOutput).contains("RuntimeException(foo)");
}
}
Upvotes: 27