Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

Kafka streams fail on decoding timestamp metadata inside StreamTask

We got strange errors on Kafka Streams during starting app

java.lang.IllegalArgumentException: Illegal base64 character 7b
    at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
    at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
    at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
    at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

and, as a result, error about failed stream: ERROR KafkaStreams - stream-client [xxx] All stream threads have died. The instance will be in error state and should be closed.

According to code inside org.apache.kafka.streams.processor.internals.StreamTask, failure happened due to error in decoding timestamp metadata (StreamTask.decodeTimestamp()). It happened on prod, and can't reproduce on stage. What could be the root cause of such errors?

Extra info: our app uses Kafka-Streams and consumes messages from several kafka brokers using the same application.id and state.dir (actually we switch from one broker to another, but during some period we connected to both brokers, so we have two kafka streams, one per each broker). As I understand, consumer group lives on broker side (so shouldn't be a problem), but state dir is on client side. Maybe some race condition occurred due to using the same state.dir for two kafka streams? could it be the root cause?

We use kafka-streams v.2.4.0, kafka-clients v.2.4.0, Kafka Broker v.1.1.1, with the following configs:

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest

Upvotes: 3

Views: 763

Answers (1)

Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

Finally, we figured out what is the root cause of corrupted metadata by some consumer groups. It was one of our internal monitoring tool (written with pykafka) that corrupted metadata by temporarily inactive consumer groups. Metadata were unencrupted and contained invalid data like the following: {"consumer_id": "", "hostname": "monitoring-xxx"}. In order to understand what exactly we have in consumer metadata, we could use the following code:

Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
        .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
        .collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
    System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));

Several options to fix already corrupted metadata:

  • change consumer group to a new one. caution that you might lose or duplicate messages depending on the latest or earliest offset reset policy. so for some cases, this option might be not acceptable
  • overwrite metadata manually (timestamp is encoded according to logic inside StreamTask.decodeTimestamp()):

    Map<TopicPartition, OffsetAndMetadata> updatedTopicPartitionToOffsetMetadataMap = kafkaConsumer.committed(topicPartitions).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new OffsetAndMetadata((entry.getValue()).offset(), "AQAAAXGhcf01"))); kafkaConsumer.commitSync(updatedTopicPartitionToOffsetMetadataMap); or specify metadata as Af////////// that means NO_TIMESTAMP in Kafka Streams.

Upvotes: 2

Related Questions