How to use Kafka time window for historical aggregation?

I need to create state store with number of authenticated users per day so I can get number of authenticated users in the last day, in the last 7 days and in the last 30 days. In order to achieve this, every authentication event is sent to auth-event topic. I am streaming this topic and creating window for every day. Code :

KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
            .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
            .withTimestampExtractor(new TransactionTimestampExtractor()));

        authStream 
                .groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
                .windowedBy(TimeWindows.of(Duration.ofDays(1)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));

After that I am inserting records on the topic. Also I have rest controller and i am reading the store using ReadOnlyWindowStore. day parameter is sent from UI and can be 1, 7 or 30 days. That means I would like to read last 7 windows. Code :

final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());

        Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));

        LocalDate currentDate = LocalDate.now();
        LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
        Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));

        try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
            Long count = 0L;
            JsonObject jsonObject = new JsonObject();
            while (it1.hasNext())
            {
                final KeyValue<Long, Long> next = it1.next();
                Date resultDate = new Date(next.key);
                jsonObject.addProperty(resultDate.toString(), next.value);
                count += next.value;
            }

            jsonObject.addProperty("tenant", tenant);
            jsonObject.addProperty("Total number of events", count);

            return ResponseEntity.ok(jsonObject.toString());
        }

The problem is that, I can get results only for 1-2 days. After that older windows are lost. The other problem is the information written in the output topic : "auth-result-topic" I am reading the results with console-consumer, and there are a lot of empty records, no key, no value, and some record with some random number. enter image description here

Any idea what is going on with my store ? How to read past N windows? Thanks

Upvotes: 0

Views: 2766

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You will need to increase the store retention time (default is 1 day), via Materialize.as(...).withRetention(...) that you can pass into count() operator.

You may also want to increase the window grace period via TimeWindows.of(Duration.ofDays(1)).grace(...).

For reading the data with the console consumer: you will need to specify the correct deserializer. The window-serde and long-serde that you use to write into the output topic uses binary formats while the console consumer assumes string data type by default. There are corresponding command line parameters you can specify to set different key and value deserializers that must match the serializers you use when writing into the topic.

Upvotes: 1

Related Questions