Bali
Bali

Reputation: 825

How to mock result from KafkaTemplate

I have a method for sending kafka message like this:

@Async
public void sendMessage(String topicName, Message message) {
    ListenableFuture<SendResult<String, Message >> future = kafkaTemplate.send(topicName, message);

    future.addCallback(new ListenableFutureCallback<>() {

        @Override
        public void onSuccess(SendResult<String, Message > result) {
            //do nothing
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("something wrong happened"!);
        }
    });
}

And now I am writing unit tests for it. I would like to test also the two callback methods onSuccess and onFailure methods, so my I idea is to mock the KafkaTemplate, something like :

KafkaTemplate kafkaTemplate = Mockito.mock(KafkaTemplate.class);

But now I am getting stuck on the mocking result for these two cases:

when(kafkaTemplate.send(anyString(), any(Message.class))).thenReturn(????);

what should I put in the thenReturn method for the case success and for the case failure? Does anyone have an idea please? Thank you very much!

Upvotes: 17

Views: 23409

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

You can mock the template but it's better to mock the interface.

    MySender sender = new MySender();
    KafkaOperations template = mock(KafkaOperations.class);
    SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();
    when(template.send(anyString(), any(Message.class))).thenReturn(future);
    sender.setTemplate(template);
    sender.send(...);

    future.set(new SendResult<>(...));
    
    ...or...

    future.setException(...

EDIT

Updated to CompletableFuture (Spring for Apache Kafka 3.0.x and later)...

public class MySender {

    private  KafkaOperations<String, String> template;

    public void setTemplate(KafkaOperations<String, String> template) {
        this.template = template;
    }

    public void send(String topic, Message<?> data) {
        CompletableFuture<SendResult<String, String>> future = this.template.send(data);
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println(result);
            }
            else {
                System.out.println(ex.getClass().getSimpleName() + "(" + ex.getMessage() + ")");
            }
        });
    }

}

@ExtendWith(OutputCaptureExtension.class)
public class So57475464ApplicationTests {

    @Test
    public void test(CapturedOutput captureOutput) {
        Message message = new GenericMessage<>("foo");
        MySender sender = new MySender();
        KafkaOperations template = mock(KafkaOperations.class);
        CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();
        given(template.send(any(Message.class))).willReturn(future);
        sender.setTemplate(template);
        sender.send("foo", message);
        future.completeExceptionally(new RuntimeException("foo"));
        assertThat(captureOutput).contains("RuntimeException(foo)");
    }

}

Upvotes: 27

Related Questions