George
George

Reputation: 3020

Spring Kafka @SendTo Not Sending Headers

I'm sending a message to Kafka using the ReplyingKafkaTemplate and it's sending the message with a kafka_correlationId. However, when it hits my @KafkaListener method and forwards it to a reply topic, the headers are lost.

How do I preserve the kafka headers?

Here's my method signature:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

I've created a ProducerInterceptor so I can see what headers are being sent from the ReplyingKafkaTemplate, as well as from the @SendTo annotation. From that, another strange thing is that the ReplyingKafkaTemplate is not adding the documented kafka_replyTopic header to the message.

Here's how the ReplyingKafkaTemplate is configured:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

I'm not sure if this is relevant, but I've added Spring Cloud Sleuth as a dependency as well, and the span/trace headers are there when I'm sending messages, but new ones are generated when a message is forwarded.

Upvotes: 0

Views: 3817

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Arbitrary headers from the request message are not copied to the reply message by default, only the kafka_correlationId.

Starting with version 2.2, you can configure a ReplyHeadersConfigurer which is called to determine which header(s) should be copied.

See the documentation.

Starting with version 2.2, you can add a ReplyHeadersConfigurer to the listener container factory. This is consulted to determine which headers you want to set in the reply message.

EDIT

BTW, in 2.2 the RKT sets up the replyTo automatically if there is no header.

With 2.1.x, it can be done, but it's a bit involved and you have to do some of the work yourself. The key is to receive and reply a Message<?>...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}

Upvotes: 1

Related Questions