Andy
Andy

Reputation: 147

NullPointerException + org.apache.kafka.common.header.internals.RecordHeader.key

I am getting the following exception on my microservice since my last release. As the exception is not complete (elipise ...), I am not able to make sense out it.

As per my understanding kafka records can have null for key (as then RoundRobinStragey is used to choose the correct partition).

I am out of ideas! Any idea as to what could be causing it?

{"@timestamp":"2023-02-21 15:58:16.947","@version":"1","message":"stream-client [Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. ","logger":"org.apache.kafka.streams.KafkaStreams","thread":"Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b-StreamThread-5","level":"ERROR","stacktrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=my-topic-name, partition=16, offset=263041154, stacktrace=java.lang.NullPointerException\n\tat org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)\n\tat org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.lambda$null$6(AbstractKafkaStreamsBinderProcessor.java:498)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.springframework.cloud.st...

Upvotes: 1

Views: 990

Answers (2)

gaurav kumar
gaurav kumar

Reputation: 129

I have encountered similar exceptions, which occur when multiple threads simultaneously iterate over the RecordHeaders in a ConsumerRecord. Although RecordHeader itself is immutable, the keyBuffer and valueBuffer are not. As a result, when two threads iterate over the ConsumerRecord headers concurrently, one thread may fail to clear the valueBuffer, leading to a NullPointerException.

Here was my previous iteration :

  public static Map<String, String> getMessageHeadersAsMap(ConsumerRecord<String, String> message) {
    Map<String, String> headers = new HashMap<>();
    message.headers().forEach(header -> {
        headers.put(header.key(), new String(header.value()));
    });
    return headers;
  }

To solve this either i had to synchronize the whole method or i did this by using local reference :

  public static Map<String, String> getMessageHeadersAsMap(ConsumerRecord<String, String> message) {
    Map<String, String> headers = new HashMap<>();
    message.headers().forEach(header -> {
      String key = header.key();
      byte[] value = header.value();
      if (key != null) {
        headers.put(key, new String(value != null ? value : EMPTY_BYTE_ARRAY));
      }
    });
    return headers;
  }

While the second still doesn't give 100% guarantee, but it provides more accuracy as well as more robustness to nullPointerException

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174769

As per my understanding kafka records can have null for key...

That is true for records, but not for record headers (each of which has a key and value).

You somehow have a RecordHeader (in record.headers()) with a null key, which should be impossible:

public RecordHeader(String key, byte[] value) {
    Objects.requireNonNull(key, "Null header keys are not permitted");
    this.key = key;
    this.value = value;
}

public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
    this.keyBuffer = Objects.requireNonNull(keyBuffer, "Null header keys are not permitted");
    this.valueBuffer = valueBuffer;
}

The NPE is on the keyBuffer here, while trying to extract the key from the buffer:

public String key() {
    if (key == null) {
        key = Utils.utf8(keyBuffer, keyBuffer.remaining());
        keyBuffer = null;
    }
    return key;
}

Upvotes: 1

Related Questions