Programmer
Programmer

Reputation: 87

Spring boot test KafkaTemplate

I have the service, that sending message

@Service
class ExportTaskService {

  @Autowired
  private KafkaTemplate<String, Object> template;
  
  public void exportNewTask(ImportTaskRequest req) {
    template.send('my-topic-name', req)
  }

}

I configured beans: consumerFactory, producerFactory, kafkaTemplate (src/main/java)

If I run application, and execute metod -- all ok and message in real message broker.

Then I need spring test, that uses ExportTaskService.exportNewTask(request) and waiting message from same topic.

My code, but not working (i cant receive message):

    @RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
        topics = "new-bitrix-leads", ports = 9092
)
public class ExportingLeadTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    ExportTaskService exportTaskService;

    @Autowired
    ConsumerFactory<String, Object> consumerFactory;

    @Test
    public void test() throws InterruptedException {
        assert(embeddedKafkaBroker != null);
        assert(exportTaskService != null);

        Consumer<String, Object> consumer =  consumerFactory.createConsumer();
        consumer.subscribe(Collections.singletonList("new-bitrix-leads"));
        exportTaskService.exportNewTask(ImportTaskRequest.builder()
                .description("descr")
                .title("title")
                .build());
        ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(3));
        assert (records.count() == 1);
    }
}

How I can read this message ? What I need to do ? I have no ideas...

BIG TNX :) !!

Upvotes: 0

Views: 1505

Answers (1)

Programmer
Programmer

Reputation: 87

My simple solution is:

@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
        topics = "new-bitrix-leads", ports = 9092
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ExportingLeadTests {

    private BlockingQueue<ConsumerRecord<String, Object>> records;

    private KafkaMessageListenerContainer<String, String> container;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    ExportTaskService exportTaskService;

    @BeforeAll
    void setUp() {
        DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
        ContainerProperties containerProperties = new ContainerProperties("new-bitrix-leads");
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, Object>) e -> records.add(e));
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    private Map<String, Object> getConsumerProperties() {
        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
        map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
        map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return map;
    }

    @Test
    public void test() throws InterruptedException {
        exportTaskService.exportNewTask(ImportTaskRequest.builder()
                .description("descr")
                .title("title")
                .gclid("gclid")
                .id("id")
                .name("my name")
                .website("http://website.com")
                .yclid("yclid")
                .source("Source")
                .build());
        ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
        assert (record != null);
        assertThat(record.value().toString(), containsString("gclid"));
    }
}

really works. nice

Upvotes: 2

Related Questions