Reputation: 147
I am getting the following exception on my microservice since my last release. As the exception is not complete (elipise ...), I am not able to make sense out it.
As per my understanding kafka records can have null for key (as then RoundRobinStragey is used to choose the correct partition).
I am out of ideas! Any idea as to what could be causing it?
{"@timestamp":"2023-02-21 15:58:16.947","@version":"1","message":"stream-client [Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. ","logger":"org.apache.kafka.streams.KafkaStreams","thread":"Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b-StreamThread-5","level":"ERROR","stacktrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=my-topic-name, partition=16, offset=263041154, stacktrace=java.lang.NullPointerException\n\tat org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)\n\tat org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.lambda$null$6(AbstractKafkaStreamsBinderProcessor.java:498)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.springframework.cloud.st...
Upvotes: 1
Views: 990
Reputation: 129
I have encountered similar exceptions, which occur when multiple threads simultaneously iterate over the RecordHeaders
in a ConsumerRecord
. Although RecordHeader
itself is immutable, the keyBuffer
and valueBuffer
are not. As a result, when two threads iterate over the ConsumerRecord
headers concurrently, one thread may fail to clear the valueBuffer
, leading to a NullPointerException
.
Here was my previous iteration :
public static Map<String, String> getMessageHeadersAsMap(ConsumerRecord<String, String> message) {
Map<String, String> headers = new HashMap<>();
message.headers().forEach(header -> {
headers.put(header.key(), new String(header.value()));
});
return headers;
}
To solve this either i had to synchronize the whole method or i did this by using local reference :
public static Map<String, String> getMessageHeadersAsMap(ConsumerRecord<String, String> message) {
Map<String, String> headers = new HashMap<>();
message.headers().forEach(header -> {
String key = header.key();
byte[] value = header.value();
if (key != null) {
headers.put(key, new String(value != null ? value : EMPTY_BYTE_ARRAY));
}
});
return headers;
}
While the second still doesn't give 100% guarantee, but it provides more accuracy as well as more robustness to nullPointerException
Upvotes: 0
Reputation: 174769
As per my understanding kafka records can have null for key...
That is true for records, but not for record headers (each of which has a key and value).
You somehow have a RecordHeader
(in record.headers()
) with a null key, which should be impossible:
public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "Null header keys are not permitted");
this.key = key;
this.value = value;
}
public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
this.keyBuffer = Objects.requireNonNull(keyBuffer, "Null header keys are not permitted");
this.valueBuffer = valueBuffer;
}
The NPE is on the keyBuffer
here, while trying to extract the key from the buffer:
public String key() {
if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
}
return key;
}
Upvotes: 1