sFishman
sFishman

Reputation: 133

What to assert in a unit test for Kafka producer void send method

Here is my producer send method that I'm writing a unit test for:

public void send(final ReadingForecastRequest requestStub) {
        logger.info("{}::send()", getClass().getCanonicalName());
        final ProducerRecord<String, ReadingForecastRequest> record = new ProducerRecord<>(config.topicName, requestStub);
        String context = requestStub.getHeader().getContext();
        Long trackingId = requestStub.getHeader().getTrackingId();
        record.headers().add("context", context.getBytes());
        record.headers().add("trackingId", Longs.toByteArray(trackingId));
        ListenableFuture future = template.send((ProducerRecord<String, Req>) record);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {

            @Override
            public void onSuccess(SendResult<String, Req> result) {
                logger.info("Sent message=[" + requestStub +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to send message=["
                    + requestStub + "] due to : " + ex.getMessage());
            }
        });
    }

Here is my test

public class ProducerTest {

    final KafkaTemplate<String, Integer> template = createKafkaTemplateMock();
    final String topicName = UUID.randomUUID().toString();
    final int sendTimeout = 1;
    final Producer producer = createProducer(topicName, sendTimeout);

    @Test
    @SuppressWarnings("unchecked")
    public void testSuccessfulSend() {
        ProducerRecord<String, String> record = mock(ProducerRecord.class);
        RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition(topicName, 0), 0, 0L, 0L, 0L, 0, 0);
        SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();
        final ReadingForecastRequest request = new ReadingForecastRequest(0, 0, 1.0, 20, new Double[] { 567.8, 976.5 }, header);
        when(template.send((ProducerRecord) any())).thenReturn(future);
        producer.send(request);
        future.set(new SendResult<String, String>(record, recordMetadata));
        verify(future, times(1)).onSuccess(SendResult.class);
    }
}

My question is on the last line of my test (which doesn't work). I'm not sure what to assert. I want to just check that the onSuccess method within the callback ran to see that the send was successful, but it's not recognizing "onSuccess". Is there something else I should be asserting?

I based my test on this stackoverflow question, which in the edited answer, captures future output and tests that. I'm having trouble understanding how that really tests the method and how to see what that output would be (e.g. for a successful run) if I were to test that.

Upvotes: 0

Views: 545

Answers (0)

Related Questions