Mangesh Pawar
Mangesh Pawar

Reputation: 21

Issue in Kafka Spring boot test cases for AdminClient

I am writing unit test cases for below class . I'm trying to mock admin client , so that i can call the below method create topic. But getting null pointer exception.

@Service
public class TopicService {

  private static final Logger LOG = LoggerFactory.getLogger(TopicService.class);
@Autowired
  private AdminClient adminClient;

public void createTopic(Topic topic) throws ExecutionException, InterruptedException {
    adminClient
            .createTopics(Collections.singletonList(ServiceHelper.fromTopic(topic)))
            .values()
            .get(topic.getName())
            .get();
  }
}

The unit test case is as follows

package org.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kafka.model.Topic;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TopicService.class})
public class TopicServiceTest {

    @Autowired
    TopicService topicService;

    @MockBean
    AdminClient adminClient;
    ListTopicsResult listTopicsResult;

    KafkaFuture<Set<String>> future;

    NewTopic newTopic;

    Topic topic;

    Collection<NewTopic> topicList;

    CreateTopicsResult createTopicsResult;
    Void t;

    Map<String,KafkaFuture<Void>> futureMap;

    private static final String TARGET_CONSUMER_GROUP_ID = "target-group-id";

    private static final Map<String, Object> CONF = new HashMap<>();

    @BeforeClass
    public static void createAdminClient() {
        try {
            CONF.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            CONF.put(REQUEST_TIMEOUT_MS_CONFIG, 120000);
            CONF.put("zookeeper.connect", "localhost:21891");

           AdminClient adminClient = AdminClient.create(CONF);
        } catch (Exception e) {
            throw new RuntimeException("create kafka admin client error", e);
        }
    }

    @Before
    public void setUp(){
        topicList = new ArrayList<>();
        newTopic = new NewTopic("topic-7",1, (short) 1);
        topicList.add(newTopic);
        futureMap = new HashMap<>();
        topic = new Topic();
        topic.setName("topic-1");
    }

    @Test
    public void createTopic() throws ExecutionException, InterruptedException {

        Properties consumerProperties = new Properties();

        Mockito.when(adminClient.createTopics(topicList))
                .thenReturn(Mockito.mock(CreateTopicsResult.class));

        Mockito.when(adminClient.createTopics(topicList).values())
                .thenReturn(Mockito.mock(Map.class));

        Mockito.when(adminClient.createTopics(topicList)
                .values()
                .get(GROUP_METADATA_TOPIC_NAME)).thenReturn(Mockito.mock(KafkaFutureImpl.class));

        Mockito.when(adminClient.createTopics(topicList)
                .values()
                .get(GROUP_METADATA_TOPIC_NAME)
                .get()).thenReturn(t);
         topicService.createTopic(topic);
    }
}

package org.kafka.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.kafka.reader.Kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class AdminConfigurer {


    @Autowired
    private Kafka kafkaConfig;



    @Bean
    public Map<String, Object> kafkaAdminProperties() {
        final Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
        if(kafkaConfig.getProperties().getSasl().getEnabled() && kafkaConfig.getSsl().getEnabled()) {
            configs.put("sasl.mechanism", kafkaConfig.getProperties().getSasl().getMechanism());
            configs.put("security.protocol", kafkaConfig.getProperties().getSasl().getSecurity().getProtocol());
            configs.put("ssl.keystore.location", kafkaConfig.getSsl().getKeystoreLocation());
            configs.put("ssl.keystore.password", kafkaConfig.getSsl().getKeystorePassword());
            configs.put("ssl.truststore.location", kafkaConfig.getSsl().getTruststoreLocation());
            configs.put("ssl.truststore.password", kafkaConfig.getSsl().getTruststorePassword());
            configs.put("sasl.jaas.config", String.format(kafkaConfig.getJaasTemplate(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getUsername(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getPassword()));
            configs.put("ssl.endpoint.identification.algorithm", "");
        }
        return configs;
    }


    @Bean
    public AdminClient getClient() {
        return AdminClient.create(kafkaAdminProperties());
    }

}

I expected the below test case run successfully. But i'm getting below error.

java.lang.NullPointerException
    at org.kafka.service.TopicService.createTopic(TopicService.java:57)
    at org.kafka.service.TopicServiceTest.createTopic(TopicServiceTest.java:100)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

I'm using 2.7.1 spring version with following client dependency.

                <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.1</version>
            <scope>test</scope>
            <classifier>test</classifier>
        </dependency>

                 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Upvotes: 2

Views: 610

Answers (2)

estarko
estarko

Reputation: 11

I would try changing the way in which you do the calls inside Mockito.when()

 Properties consumerProperties = new Properties();
 CreateTopicsResult cTR= Mockito.mock(CreateTopicsResult.class);

    Mockito.when(adminClient.createTopics(topicList))
            .thenReturn(cTR);
    Map<String,KafkaFutureImpl> map= Mockito.mock(Map.class);
    Mockito.when(cTR.values())
            .thenReturn(map); 
    KafkaFutureImpl kFI= Mockito.mock(KafkaFutureImpl.class);
  Mockito.when(map.get(GROUP_METADATA_TOPIC_NAME)).thenReturn(kFI);

    Mockito.when(kFI.get()).thenReturn(t);
    topicService.createTopic(topic);

I would try with this code as I usually write these tests by this way, but I haven´t checked the result.

Good luck

Upvotes: 0

Lilia
Lilia

Reputation: 77

I recommend @Configuration instead of @Component, so that Spring could pick up beans.

Upvotes: 0

Related Questions