Geeta Puri
Geeta Puri

Reputation: 51

Produce Record to Kafka

I am trying to produce a record to Kafka without passing any partition value but want to send a header and I have the below constructor methods to produce record to Kafka:

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)  

Creates a record with a specified timestamp to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value, java.lang.Iterable<Header> headers)  

Creates a record with a specified timestamp to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, K key, V value)    

Creates a record to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, K key, V value, java.lang.Iterable<Header> headers)    

Creates a record to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, K key, V value) 

Create a record to be sent to Kafka

ProducerRecord​(java.lang.String topic, V value)    

Create a record with no key

In all the above methods, there is no way I can send a header without sending a partition value, and if I set partition as null I am getting NullPointerException.

Can you please advice me how to produce record in Kafka by sending a header and not partition value.

Upvotes: 2

Views: 2066

Answers (2)

Rohit Yadav
Rohit Yadav

Reputation: 2578

Kafka ProducerRecord Instance provides the Headers Instance where we can add key-value pairs.

You can go through the Kafka source code to get complete information.

https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java

Example

I tried with kafka-clients-0.8.2.0 but this functionality is not available that version. so you need to check also Kafka's client version also.

The example given below is using Kafka Client - 2.1.0.

        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
        Headers headers = record.headers();
        headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
        headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
        producer.send(record);

Upvotes: 0

mazaneicha
mazaneicha

Reputation: 9425

ProducerRecord object has method headers() which returns the Headers interface, which in turn allows you to add() any header you like.

Upvotes: 1

Related Questions