Reputation: 1
I try to use kafka replay-request template.And have this bean to create conteiners for listen replay
@Bean
fun repliesContainer(containerFactory: ConcurrentKafkaListenerContainerFactory<String, Any>)
: ConcurrentMessageListenerContainer<String, Any> {
val repliesContainer: ConcurrentMessageListenerContainer<String, Any> =
containerFactory.createContainer("replay")
repliesContainer.containerProperties.setGroupId("my-kafka-group")
repliesContainer.isAutoStartup = true
return repliesContainer
}
@Bean
fun replyingTemplate(
pf: ProducerFactory<String, Any>?,
repliesContainer: ConcurrentMessageListenerContainer<String, Any>?
): ReplyingKafkaTemplate<String, Any, Any> {
return ReplyingKafkaTemplate(pf, repliesContainer)
}
And i have this method for send message
@Throws(Exception::class)
fun kafkaRequestReply(request: MessageStructure): Any {
val record = ProducerRecord<String, Any>(request.sendTo, request.content)
record.headers().add(RecordHeader(KafkaHeaders.REPLY_TOPIC, request.replyTo.toByteArray()))
record.headers().add("timeStamp", request.timeStamp.toString().toByteArray())
val replyFuture: RequestReplyFuture<String, Any, Any> =
template.sendAndReceive(record)
val sendResult: SendResult<String, Any> = replyFuture.sendFuture[10, TimeUnit.SECONDS]
val consumerRecord = replyFuture[10, TimeUnit.SECONDS]
return consumerRecord.value()
}
How i can dynamically set replay topic in repliesContainer
MessageStructure.replyTo
from method kafkaRequestReply
I trying to find some overload method sendAndReceive
where i can say in which topic i want to get replay
Upvotes: 0
Views: 26