xeLL
xeLL

Reputation: 537

How to add headers to Kafka message?

I need more headers fort my speacial needs like logging and so on. And I'm a little bit confused on how to add them. I believe that they should be added somewhere in place where i communicate directly via send() methode. How can I set custom headers to my Kafka message in Spring App where I communicate like this:

final var streamMessage = StreamMessage
            .builder()
            .payload(Event.builder().eventId(eventId).result(result).build())
            .build();

    ListenableFuture<SendResult<String, StreamMessage<?>>> future = kafkaTemplate.send(TOPIC, streamMessage);

And my Kafka configuration looks like this:

@Configuration
public class KafkaConfiguration {

    private final Map<String, Object> producerProps;
    private final Map<String, Object> consumerProps;

    @Autowired
    public KafkaConfiguration(@Value("${kafka.bootstrap.servers}") String bootstrapServers) {
        this.producerProps = producerProps(bootstrapServers);
        this.consumerProps = consumerProps(bootstrapServers);
    }

    private Map<String, Object> producerProps(String bootstrapServers) {
        final Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        return props;
    }

    private Map<String, Object> consumerProps(String bootstrapServers) {
        final Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return props;
    }

    @Bean
    public ConsumerFactory<String, StreamMessage<?>> consumerFactory() {

        JsonDeserializer<StreamMessage<?>> deserializer = new JsonDeserializer<>(StreamMessage.class);

        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(consumerProps,
                new StringDeserializer(),
                deserializer);
    }

    @Bean
    public ProducerFactory<String, StreamMessage<?>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerProps);
    }

    @Bean
    public KafkaTemplate<String, StreamMessage<?>> kafkaProducerTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> listenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Upvotes: 1

Views: 10343

Answers (1)

xeLL
xeLL

Reputation: 537

Last answer from Pim (Adding custom header using Spring Kafka) worked for me:
I was looking for an answer when I stumbled upon this question. However I'm using the ProducerRecord<?, ?> class instead of Message<?>, so the header mapper does not seem to be relevant.

Here is my approach to add a custom header:

var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);

Now to read the headers (before consuming), I've added a custom interceptor.

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        Set<TopicPartition> partitions = records.partitions();
        partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
        
        return records;
    }

    private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
        records.forEach(record -> {
            var myHeaders = new ArrayList<Header>();
            record.headers().headers("MyHeader").forEach(myHeaders::add);
            log.info("My Headers: {}", myHeaders);
            // Do with header as you see fit
        });
    }

    @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

The final bit is to register this interceptor with the Kafka Consumer Container with the following (Spring Boot) configuration:

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
public class MessagingConfiguration {

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

}

Upvotes: 1

Related Questions