haukeh
haukeh

Reputation: 182

Kafka Streams: Custom TimestampExtractor for aggregation

I am building a pretty straightforward KafkaStreams demo application, to test a use case.

I am not able to upgrade the Kafka broker I am using (which is currently on version 0.10.0), and there are several messages written by a pre-0.10.0 Producer, so I am using a custom TimestampExtractor, which I add as a default to the config in the beginning of my main class:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);

When consuming from my source topic, this works perfectly fine. But when using an aggregation operator, I run into an exception because the FailOnInvalidTimestamp implementation of TimestampExtractor is used instead of the custom implementation when consuming from the internal aggregation topic.

The code of the Streams app looks something like this:

...

KStream<String, MyValueClass> clickStream = streamsBuilder
              .stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
              .map(((key, value) -> new KeyValue<>(value.getId(), value)))
              .groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
              .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
              .count();
...

The Exception I'm encountering is the following:

    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp. 
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

Now the question is: Is there any way I can make Kafka Streams use the custom TimestampExtractor when reading from the internal aggregation topic (optimally while still using the Streams DSL)?

Upvotes: 8

Views: 6940

Answers (3)

It is well known issue :-). I have the same problem with old clients in the projects which are still using older Kafka clients like 0.9 and also when communicating with some "not certified" .NET clients.

Therefore I wrote dedicated class:

public class MyTimestampExtractor implements TimestampExtractor {

    private static final Logger LOG = LogManager.getLogger( MyTimestampExtractor.class );

    @Override
    public long extract ( ConsumerRecord<Object, Object> consumerRecord, long previousTimestamp ) {
        final long timestamp = consumerRecord.timestamp();

        if ( timestamp < 0 ) {
            final String msg = consumerRecord.toString().trim();
            LOG.warn( "Record has wrong Kafka timestamp: {}. It will be patched with local timestamp. Details: {}", timestamp, msg );
            return System.currentTimeMillis();
        }

        return timestamp;
    }
}

When there are many messages you may skip logging, as it may flood.

Upvotes: 1

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

You cannot change the timestamp extractor (as of v1.0.0). This is not allowed for correctness reasons.

But I am really wondering, how a record with timestamp -1 is written into this topic in the first place. Kafka Streams uses the timestamp that was provided by your custom extractor when writing the record. Also note, that KafkaProducer does not allow to write records with negative timestamp.

Thus, the only explanation I can think of is that some other producer did write into the repartitioning topic -- and this is not allowed... Only Kafka Streams should write into the repartioning topic.

I guess, you will need to delete this topic and let Kafka Streams recreate it to get back into a clean state.

From the discussion/comment of the other answer:

You need 0.10+ format to work with Kafka Streams. If you upgrade your brokers and keep 0.9 format or older, Kafka Streams might not work as expected.

Upvotes: 7

haukeh
haukeh

Reputation: 182

After reading Matthias' answer I double checked everything and the cause of the issue were incompatible versions between the Kafka Broker and the Kafka Streams app. I was stupid enough to use Kafka Streams 1.0.0 with a 0.10.1.1 Broker, which is clearly stated as incompatible in the Kafka Wiki here.

Edit (thx to Matthias): The actual cause of the problem was the fact, that the log format used by our 0.10.1.x broker was still 0.9.0.x, which is incompatible with Kafka Streams.

Upvotes: 0

Related Questions