Reputation: 3020
I'm sending a message to Kafka using the ReplyingKafkaTemplate
and it's sending the message with a kafka_correlationId
. However, when it hits my @KafkaListener
method and forwards it to a reply topic, the headers are lost.
How do I preserve the kafka headers?
Here's my method signature:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
I've created a ProducerInterceptor
so I can see what headers are being sent from the ReplyingKafkaTemplate
, as well as from the @SendTo
annotation. From that, another strange thing is that the ReplyingKafkaTemplate
is not adding the documented kafka_replyTopic
header to the message.
Here's how the ReplyingKafkaTemplate
is configured:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
I'm not sure if this is relevant, but I've added Spring Cloud Sleuth as a dependency as well, and the span/trace headers are there when I'm sending messages, but new ones are generated when a message is forwarded.
Upvotes: 0
Views: 3817
Reputation: 174494
Arbitrary headers from the request message are not copied to the reply message by default, only the kafka_correlationId
.
Starting with version 2.2, you can configure a ReplyHeadersConfigurer
which is called to determine which header(s) should be copied.
See the documentation.
Starting with version 2.2, you can add a
ReplyHeadersConfigurer
to the listener container factory. This is consulted to determine which headers you want to set in the reply message.
EDIT
BTW, in 2.2 the RKT sets up the replyTo automatically if there is no header.
With 2.1.x, it can be done, but it's a bit involved and you have to do some of the work yourself. The key is to receive and reply a Message<?>
...
@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
System.out.println(in);
Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
return MessageBuilder.withPayload(in.getPayload().toUpperCase())
.setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.build();
}
// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Reply: " + reply.value() + " myHeader="
+ new String(reply.headers().lastHeader("myHeader").value()));
};
}
Upvotes: 1