Mohit Singh
Mohit Singh

Reputation: 477

What is the use of Header in Kafka Processor API?

I am learning Kafka Processor API and find one method headers in ProcessorContext.

headers​()

Returns the headers of the current input record; could be null if it is not available

What is the use of this method?

In docs only one line is written:

Returns the headers of the current input record; could be null if it is not available

Can i perform some operation on this like add?

Upvotes: 3

Views: 766

Answers (2)

Vasco Ferraz
Vasco Ferraz

Reputation: 1

The answer marked as accepted is not fully complete.

Headers are in fact added to the record but the record is not sent back to the topic.

For that we need to use the forward method:

public void process(Record<RecordKey, RecordValue> record) {

        // add a header to the elements
        record.headers().add("key", "value".getBytes());

        // forward the record
        context.forward(record);
}

Upvotes: 0

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39860

A header is some sort of metadata that can be appended to each message. Headers can be used in various scenarios like appending information that can be used when filtering records etc.


You can access messages' metadata through Processor API and more precisely process(), transform() and transformValues(). For example, in order to add a header to a record, the following will do the trick:

public void process(String key, String value) {

    // add a header to the elements
    context().headers().add.("key", "value")
}

Upvotes: 3

Related Questions