Reputation: 537
I need more headers fort my speacial needs like logging and so on. And I'm a little bit confused on how to add them. I believe that they should be added somewhere in place where i communicate directly via send() methode. How can I set custom headers to my Kafka message in Spring App where I communicate like this:
final var streamMessage = StreamMessage
.builder()
.payload(Event.builder().eventId(eventId).result(result).build())
.build();
ListenableFuture<SendResult<String, StreamMessage<?>>> future = kafkaTemplate.send(TOPIC, streamMessage);
And my Kafka configuration looks like this:
@Configuration
public class KafkaConfiguration {
private final Map<String, Object> producerProps;
private final Map<String, Object> consumerProps;
@Autowired
public KafkaConfiguration(@Value("${kafka.bootstrap.servers}") String bootstrapServers) {
this.producerProps = producerProps(bootstrapServers);
this.consumerProps = consumerProps(bootstrapServers);
}
private Map<String, Object> producerProps(String bootstrapServers) {
final Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return props;
}
private Map<String, Object> consumerProps(String bootstrapServers) {
final Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
return props;
}
@Bean
public ConsumerFactory<String, StreamMessage<?>> consumerFactory() {
JsonDeserializer<StreamMessage<?>> deserializer = new JsonDeserializer<>(StreamMessage.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(consumerProps,
new StringDeserializer(),
deserializer);
}
@Bean
public ProducerFactory<String, StreamMessage<?>> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public KafkaTemplate<String, StreamMessage<?>> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Upvotes: 1
Views: 10343
Reputation: 537
Last answer from Pim (Adding custom header using Spring Kafka) worked for me:
I was looking for an answer when I stumbled upon this question. However I'm using the ProducerRecord<?, ?>
class instead of Message<?>
, so the header mapper does not seem to be relevant.
Here is my approach to add a custom header:
var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);
Now to read the headers (before consuming), I've added a custom interceptor.
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
Set<TopicPartition> partitions = records.partitions();
partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
return records;
}
private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
records.forEach(record -> {
var myHeaders = new ArrayList<Header>();
record.headers().headers("MyHeader").forEach(myHeaders::add);
log.info("My Headers: {}", myHeaders);
// Do with header as you see fit
});
}
@Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
The final bit is to register this interceptor with the Kafka Consumer Container with the following (Spring Boot) configuration:
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
public class MessagingConfiguration {
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}
Upvotes: 1