Reputation: 423
I'm building a Spring Boot 2.4.4 application to produce message to Kafka (Confluent Platform), using schema validation.
The schema is set up on the schema registry for the topic, and I've created the POJO (LoanInitiate
) from the associated AVSC file using the Maven Avro plugin. When sending using the following, there are no issues:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
...
@Autowired
private KafkaTemplate<String, LoanInitiate> kafkaTemplate;
@Value("${kafka.topic}")
private String topic;
public void regularSend(){
LoanInitiate li = initializeLoanInitiate();
kafkaTemplate.send(topic, li); //ok
}
However, if I attempt the following, I will get Unsupported Avro type.
@Autowired
private KafkaTemplate<String, Message<LoanInitiate>> kafkaTemplate2;
public void sendMessage(){
Message<LoanInitiate> message = MessageBuilder
.withPayload(li)
.setHeader(KafkaHeaders.CORRELATION_ID, "Correlation ID")
.build();
kafkaTemplate2.send(topic, message); //"Unsupported Avro type"
}
My producer configuration is as follows:
#Producer values
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
I understand that the kafkaTemplate2
is trying to send a Message<LoanInitiate>
and not a LoanInitiate
but it's not clear to me what I am missing to provide for writing the message and header.
How should I approach sending my message along with header information?
Upvotes: 3
Views: 11982
Reputation: 174779
With the version in the question, it depends on whether Jackson is on the class path; if it is, the default MessagingMessageConverter
gets a DefaultKafkaHeaderMapper
which can map headers of any json-friendly type, otherwise you get a SimpleKafkaHeaderMapper
which only maps headers with byte[]
values.
So, if you add getBytes()
to the value, it should work.
You can add a MessagingMessageConverter
configured with a SimpleKafkaHeaderMapper
to the template with its mapAllStringsOut
set to true which will also map headers with String
values.
On the inbound side, you can map back to a String by setting the rawMappedHeaders
property appropriately.
You can also use your own custom header mapper.
See https://docs.spring.io/spring-kafka/reference/html/#headers and https://docs.spring.io/spring-kafka/reference/html/#messaging-message-conversion
as well as the Javadocs for the mappers.
Upvotes: 2
Reputation: 423
After working through several iterations, posting the question, and then taking a break I found a solution by using ProducerRecord
ProducerRecord<String, LoanInitiate> producerRecord = new ProducerRecord<>(topic, li);
producerRecord
.headers()
.add(KafkaHeaders.CORRELATION_ID,
"Correlation ID".getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(producerRecord);
Upvotes: 2