Reputation: 851
I'm having big troubles trying to test a kafka listener.
The class is the following:
@Component
class KafkaListener(
private val useCase: UseCase
) {
@KafkaListener(
topics = ["topic"],
groupId = "demo",
// this is giving me problems, everything works well when I remove it
containerFactory = "containerFactory",
clientIdPrefix = "prefix"
)
fun listener(message: String) {
println("message received!")
useCase.foo()
}
}
The test:
@TestConfiguration
class TestConfig {
@Bean
fun containerFactory(): ConcurrentKafkaListenerContainerFactory1<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory1<String, String>()
val props = HashMap<String, Any?>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
val defaultConsumer = DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())
factory.consumerFactory = defaultConsumer
return factory
}
}
@Testcontainers
@SpringBootTest(properties = ["spring.kafka.consumer.auto-offset-reset=earliest"])
@EnableAutoConfiguration(exclude = [MongoAutoConfiguration::class])
@Import(KafkaListener::class, TestConfig::class)
class KafkaShippingGroupCreationListenerAdapterTest {
companion object {
@Container
private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.3"))
@JvmStatic
@DynamicPropertySource
fun overrideProperties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
}
@MockBean
private lateinit var useCase: UseCase
@Autowired
private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Autowired
private lateinit var kafkaListener: KafkaListener
@Test
fun `should invoke the use case`() {
kafkaTemplate.send("topic", "message")
verify(useCase,
timeout(5000).times(1)).foo()
}
}
Everything works well when I don't specify a factory in the listener function but when I specify a factory the listener is not being updated with the kafka boostrap server given by the container.
Upvotes: 1
Views: 795
Reputation: 174689
Why would you expect it to work? Boot's auto configured factory uses the spring.kafka.bootstrap-servers
property set up in the test, but yours is hard coded to "127.0.0.1:9092"
.
Upvotes: 1