Mauricio Avendaño
Mauricio Avendaño

Reputation: 851

Test Kafka listener with custom container factory in spring boot

I'm having big troubles trying to test a kafka listener.

The class is the following:

@Component
class KafkaListener(
       private val useCase: UseCase
) {
   @KafkaListener(
           topics = ["topic"],
           groupId = "demo",
           // this is giving me problems, everything works well when I remove it
           containerFactory = "containerFactory",
           clientIdPrefix = "prefix"
   )
   fun listener(message: String) {
       println("message received!")
       useCase.foo()
   }
}

The test:

@TestConfiguration
class TestConfig {
    @Bean
    fun containerFactory(): ConcurrentKafkaListenerContainerFactory1<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory1<String, String>()
        val props = HashMap<String, Any?>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
        val defaultConsumer = DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())
        factory.consumerFactory = defaultConsumer

        return factory
    }
}

@Testcontainers
@SpringBootTest(properties = ["spring.kafka.consumer.auto-offset-reset=earliest"])
@EnableAutoConfiguration(exclude = [MongoAutoConfiguration::class])
@Import(KafkaListener::class, TestConfig::class)
class KafkaShippingGroupCreationListenerAdapterTest {

    companion object {
        @Container
        private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.3"))

        @JvmStatic
        @DynamicPropertySource
        fun overrideProperties(registry: DynamicPropertyRegistry) {
            registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
        }
    }

    @MockBean
    private lateinit var useCase: UseCase

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, Any>

    @Autowired
    private lateinit var kafkaListener: KafkaListener

    @Test
    fun `should invoke the use case`() {
        kafkaTemplate.send("topic", "message")

        verify(useCase,
                timeout(5000).times(1)).foo()
    }
}

Everything works well when I don't specify a factory in the listener function but when I specify a factory the listener is not being updated with the kafka boostrap server given by the container.

Upvotes: 1

Views: 795

Answers (1)

Gary Russell
Gary Russell

Reputation: 174689

Why would you expect it to work? Boot's auto configured factory uses the spring.kafka.bootstrap-servers property set up in the test, but yours is hard coded to "127.0.0.1:9092".

Upvotes: 1

Related Questions