Reputation: 15
I am new to Kafka. I have set up my environment to produce and consume records. But, my goal is to intercept records and modify their value before they are sent to the intended consumer. For now, in order to make sure that the environment is well set up to intercept records I am writing a simple ConsumerInterceptor that will intercept records and println their values. what should my configure() method implements to enable my consumerInterceptor? what other configuration should I add/modify and where?
public class SimpleConsumerInterceptors<K, V> implements ConsumerInterceptor<K, V>{
private String clientId;
public void configure(final Map<String, ?> configs) {
// What configurations required to enable my consumerInterceptor?
}
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
for (TopicPartition partition : records.partitions()) {
String topic = partition.topic();
List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
for (ConsumerRecord<K, V> record : recordsInPartition) {
System.out.println("onConsume:");
System.out.println(record.value());
}
}
return interceptRecords;
}
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("onCommit")
}
@Override
public void close() {
System.out.println("close")
this.close();
}
}
Upvotes: 0
Views: 3250
Reputation: 174554
All you have to do is add the class name to the interceptor.classes
consumer property, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG
.
You only have to implement configure()
if you want to configure the interceptor after Kafka creates the instance.
The map passed into the method is the map of consumer properties.
So, let's say you want to make the logging optional; add my.interceptor.logging.enabled=true
to the consumer config, and use it in the configure()
method to configure whether the records are logged.
Upvotes: 1