Reputation: 3955
We got strange errors on Kafka Streams during starting app
java.lang.IllegalArgumentException: Illegal base64 character 7b
at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
and, as a result, error about failed stream: ERROR KafkaStreams - stream-client [xxx] All stream threads have died. The instance will be in error state and should be closed.
According to code inside org.apache.kafka.streams.processor.internals.StreamTask
, failure happened due to error in decoding timestamp metadata (StreamTask.decodeTimestamp()
). It happened on prod, and can't reproduce on stage.
What could be the root cause of such errors?
Extra info: our app uses Kafka-Streams and consumes messages from several kafka brokers using the same application.id
and state.dir
(actually we switch from one broker to another, but during some period we connected to both brokers, so we have two kafka streams, one per each broker). As I understand, consumer group lives on broker side (so shouldn't be a problem), but state dir is on client side. Maybe some race condition occurred due to using the same state.dir
for two kafka streams? could it be the root cause?
We use kafka-streams
v.2.4.0
, kafka-clients
v.2.4.0
, Kafka Broker v.1.1.1
, with the following configs:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest
Upvotes: 3
Views: 763
Reputation: 3955
Finally, we figured out what is the root cause of corrupted metadata by some consumer groups.
It was one of our internal monitoring tool (written with pykafka
) that corrupted metadata by temporarily inactive consumer groups.
Metadata were unencrupted and contained invalid data like the following: {"consumer_id": "", "hostname": "monitoring-xxx"}
.
In order to understand what exactly we have in consumer metadata, we could use the following code:
Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));
Several options to fix already corrupted metadata:
latest
or earliest
offset reset policy. so for some cases, this option might be not acceptableoverwrite metadata manually (timestamp is encoded according to logic inside StreamTask.decodeTimestamp()
):
Map<TopicPartition, OffsetAndMetadata> updatedTopicPartitionToOffsetMetadataMap = kafkaConsumer.committed(topicPartitions).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new OffsetAndMetadata((entry.getValue()).offset(), "AQAAAXGhcf01")));
kafkaConsumer.commitSync(updatedTopicPartitionToOffsetMetadataMap);
or specify metadata as Af//////////
that means NO_TIMESTAMP
in Kafka Streams.
Upvotes: 2