Chris
Chris

Reputation: 1349

Kafka Streams - Creating Windowed State Store

The below code "works" but I am confused on the meaning on the values passed in Stores.persistentWindowStore(). I found the documentation (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-) but the definition of the args is not clear to me.

Should the windowBy() value always match windowSize in persistentWindowStore()?

What should the retention period be set to? The retention policy of the source topic?

What do the number of segments do?

What are retaining duplicates for? Document seems to indicate to set to true for joins?

long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;

bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
    .aggregate(() -> Lists.newArrayList(),
        (aggKey, newValue, aggValue) -> {
            BdrData d = new BdrData();
            d.setCharge(newValue.getBdr().getCost());
            aggValue.add(d);
            return aggValue;
        },
        Materialized.<String, ArrayList<BdrData>>as(
            Stores.persistentWindowStore("store5", 
                retentionPeriod, 
                numSegments, 
                windowSize,
                retainDuplicates))
                .withKeySerde(Serdes.String())
                .withValueSerde(listBdrDataSerde))
    .toStream()
    .process(() -> new WindowAggregatorProcessor());

Upvotes: 1

Views: 5746

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Should the windowBy() value always match windowSize in persistentWindowStore()?

Yes.

What should the retention period be set to? The retention policy of the source topic?

It should match the retention period of the windows, that you can specify via Windows#until() (default is 1 day)

What do the number of segments do?

The number of segments determine how coarse/fine grained data (ie, old windows) are expired. The segment size will be "retention-period / (#segments + 1)". Note, that more segments give you more fine grained data expiration but increases overhead (each segment uses it's own RocksDB instance)

What are retaining duplicates for? Document seems to indicate to set to true for joins?

By default, keys must be unique. If you enable retain duplicates, you can store the same key multiple times. Enabling duplicates comes with a performance hit.

Note:

This part of the API was reworked and simplified in upcoming 2.1 release. Compare KIP-319 and KIP-328 for details.

Upvotes: 3

Related Questions