Alper Kanat
Alper Kanat

Reputation: 396

Kafka Streams Windowing with Custom TimestampExtractor

I'm trying to create a Kafka Streams application where I'm trying to calculate unique devices per platform within a time window.

Event Class

public class Event {
    private String eventId;
    private String deviceId;
    private String platform;
    private ZonedDateTime createdAt;
}

I need the time window respect the event's createdAt so I wrote a TimestampExtractor implementation like below:

public class EventTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
        final Event event = (Event) record.value();
        final ZonedDateTime eventCreationTime = event.getCreatedAt();
        final long timestamp = eventCreationTime.toEpochSecond();

        log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);

        return timestamp;
    }
}

Lastly, here's my streaming app code:

final KStream<String, Event> eventStream = builder.stream("events_ingestion");

eventStream
    .selectKey((key, event) -> {
        final String platform = event.getPlatform();
        final String deviceId = event.getDeviceId());

        return String.join("::", platform, deviceId);
    })
    .groupByKey()
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
    .count(Materialized.as(COUNT_STORE));

When I push an event into event_ingestion topic, I can see that the timestamp is logged into the application logs and data is being written into the count store.

When I iterate over the count store, I see the following:

Key: [ANDROID::1@1539000000/1539900000], Value: 2

Although my time window is 15 minutes, the key spans 10 days. If I remove my TimestampExtractor implementation from stream config (hence go back to processing time) the key spans 15 minutes as expected:

Key: [ANDROID::1@1539256500000/1539257400000], Value: 1

What am I doing wrong here? Any ideas?

Upvotes: 0

Views: 1568

Answers (1)

talhaocakci
talhaocakci

Reputation: 141

TimestampExtractor uses epoch milliseconds value for windowing. You are calculating "seconds" which will put the message into wrong time window.

Upvotes: 2

Related Questions