Reputation: 477
I am learning Kafka Processor API and find one method headers in ProcessorContext
.
headers()
Returns the headers of the current input record; could be null if it is not available
What is the use of this method?
In docs only one line is written:
Returns the headers of the current input record; could be null if it is not available
Can i perform some operation on this like add?
Upvotes: 3
Views: 766
Reputation: 1
The answer marked as accepted is not fully complete.
Headers are in fact added to the record but the record is not sent back to the topic.
For that we need to use the forward method:
public void process(Record<RecordKey, RecordValue> record) {
// add a header to the elements
record.headers().add("key", "value".getBytes());
// forward the record
context.forward(record);
}
Upvotes: 0
Reputation: 39860
A header is some sort of metadata that can be appended to each message. Headers can be used in various scenarios like appending information that can be used when filtering records etc.
You can access messages' metadata through Processor API and more precisely process()
, transform()
and transformValues()
. For example, in order to add a header to a record, the following will do the trick:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}
Upvotes: 3