Andrwy J
Andrwy J

Reputation: 1

Dynamic kafka replay-request topics in ConcurrentKafkaListenerContainerFactory

I try to use kafka replay-request template.And have this bean to create conteiners for listen replay

    @Bean 
    fun repliesContainer(containerFactory: ConcurrentKafkaListenerContainerFactory<String, Any>)
            : ConcurrentMessageListenerContainer<String, Any> {
        val repliesContainer: ConcurrentMessageListenerContainer<String, Any> =
            containerFactory.createContainer("replay")
        repliesContainer.containerProperties.setGroupId("my-kafka-group")
        repliesContainer.isAutoStartup = true
        return repliesContainer
    }
    @Bean
    fun replyingTemplate(
        pf: ProducerFactory<String, Any>?,
        repliesContainer: ConcurrentMessageListenerContainer<String, Any>?
    ): ReplyingKafkaTemplate<String, Any, Any> {
        return ReplyingKafkaTemplate(pf, repliesContainer)
    }

And i have this method for send message

    @Throws(Exception::class)
    fun kafkaRequestReply(request: MessageStructure): Any {
        val record = ProducerRecord<String, Any>(request.sendTo, request.content)
        record.headers().add(RecordHeader(KafkaHeaders.REPLY_TOPIC, request.replyTo.toByteArray()))
        record.headers().add("timeStamp", request.timeStamp.toString().toByteArray())
        val replyFuture: RequestReplyFuture<String, Any, Any> =
            template.sendAndReceive(record)
        val sendResult: SendResult<String, Any> = replyFuture.sendFuture[10, TimeUnit.SECONDS]
        val consumerRecord = replyFuture[10, TimeUnit.SECONDS]
        return consumerRecord.value()
    }

How i can dynamically set replay topic in repliesContainer MessageStructure.replyTo from method kafkaRequestReply

I trying to find some overload method sendAndReceive where i can say in which topic i want to get replay

Upvotes: 0

Views: 26

Answers (0)

Related Questions