Reputation: 71
Please find the use case we need to implement.
First, we need to invoke a Kafka producer a message as a rest service, they will process and give back the response in another topic.
For us, It is a request-reply topic we need to reply back for the same request the response, using replykafka template is working fine, but we can set co-relation id in the header.
As a topic message metadata there are sending in attributes, is there any way to map the co-relation id with request topic message and reply topic message.
Explain to you better.
One microservice expects the payload as given below with correlationId in payload.
{
"operationDate": "2020-09-16T11:58:25",
"correlationId": "-5544538377183901824042719876882142227",
"birthDate": "2013-12-12",
"firstNameEn": "boby",
"firstNameAr": "الشيخ",
}
The microservice will process the payload and will give a response in another topic as.
{
"correlationId": -5544538377183901824042719876882142227,
"consumerId": null,
"userid": 123456,
"statusCode": "SUCCESS",
"errors": null
}
Now as this we need to implement using spring ReplyingKafkaTemplate.
As ReplyingKafkaTemplate will work with correlationId in the header only
Upvotes: 2
Views: 2316
Reputation: 71
Thanks for the hint.
I have done as overriding the payload with Kafka header correlationId.
@Override
protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
if(producerRecord.value()!=null){
// i have appeneded the header correlationId in th payload
}
return super.doSend(producerRecord);
}
And in Replay onMessage ,i have populated the response payload correlationId to the header.
@Override
public void onMessage(List<ConsumerRecord<K, R>> data) {
data.forEach(
krConsumerRecord -> //update each record header
);
super.onMessage(data);
}
In this way was successfully integrated request-response semantics with correlationId in the request and response payload.
Upvotes: 0
Reputation: 174494
Assuming you mean you want to include the topic(s) in the correlation id, see
/**
* Set a function to be called to establish a unique correlation key for each request
* record.
* @param correlationStrategy the function.
* @since 2.3
*/
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {
You can create your own correlation id, based on the ProducerRecord
(which has the topic()
).
You just need to make sure it is unique. If you manually set the KafkaHeaders.REPLY_TOPIC
, it will be visible to the strategy.
EDIT
With the correlation id in the payload, use setCorrelationIdStrategy
to extract the correlationId from the payload and add a RecordInterceptor
to do the same on the reply side.
Upvotes: 1