Timbuck
Timbuck

Reputation: 423

How to set message headers using KafkaTemplate?

I'm building a Spring Boot 2.4.4 application to produce message to Kafka (Confluent Platform), using schema validation. The schema is set up on the schema registry for the topic, and I've created the POJO (LoanInitiate) from the associated AVSC file using the Maven Avro plugin. When sending using the following, there are no issues:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
...

@Autowired
private KafkaTemplate<String, LoanInitiate> kafkaTemplate;

@Value("${kafka.topic}")
private String topic;


public void regularSend(){
    LoanInitiate li = initializeLoanInitiate();
    kafkaTemplate.send(topic, li); //ok
}

However, if I attempt the following, I will get Unsupported Avro type.

@Autowired
    private KafkaTemplate<String, Message<LoanInitiate>> kafkaTemplate2;

public void sendMessage(){
     Message<LoanInitiate> message = MessageBuilder
                                    .withPayload(li)
                                    .setHeader(KafkaHeaders.CORRELATION_ID, "Correlation ID")
                                    .build();
     kafkaTemplate2.send(topic, message); //"Unsupported Avro type"

}

My producer configuration is as follows:

#Producer values
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

I understand that the kafkaTemplate2 is trying to send a Message<LoanInitiate> and not a LoanInitiate but it's not clear to me what I am missing to provide for writing the message and header.

How should I approach sending my message along with header information?

Upvotes: 3

Views: 11982

Answers (2)

Gary Russell
Gary Russell

Reputation: 174779

With the version in the question, it depends on whether Jackson is on the class path; if it is, the default MessagingMessageConverter gets a DefaultKafkaHeaderMapper which can map headers of any json-friendly type, otherwise you get a SimpleKafkaHeaderMapper which only maps headers with byte[] values.

So, if you add getBytes() to the value, it should work.

You can add a MessagingMessageConverter configured with a SimpleKafkaHeaderMapper to the template with its mapAllStringsOut set to true which will also map headers with String values.

On the inbound side, you can map back to a String by setting the rawMappedHeaders property appropriately.

You can also use your own custom header mapper.

See https://docs.spring.io/spring-kafka/reference/html/#headers and https://docs.spring.io/spring-kafka/reference/html/#messaging-message-conversion

as well as the Javadocs for the mappers.

Upvotes: 2

Timbuck
Timbuck

Reputation: 423

After working through several iterations, posting the question, and then taking a break I found a solution by using ProducerRecord

ProducerRecord<String, LoanInitiate> producerRecord = new ProducerRecord<>(topic, li);
            producerRecord
            .headers()
            .add(KafkaHeaders.CORRELATION_ID, 
            "Correlation ID".getBytes(StandardCharsets.UTF_8));

            kafkaTemplate.send(producerRecord);

Upvotes: 2

Related Questions