sobychacko
sobychacko

Reputation: 5924

KTable state store persistence

If I use a persistent store when materializing a KTable, will the state store be persistent across application restarts? For example, if I use the following:

StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier =      Stores.persistentKeyValueStore("queryable-store-name");
 KTable<Long,String> table = builder.table(
   "foo",
   Materialized.as(storeSupplier)
               .withKeySerde(Serdes.Long())
               .withValueSerde(Serdes.String())

Will the state store "queryable-store-name" be accessible with state from previous runs on a restart? Lets say, I send 50 records to topic foo and it gets materialized in the state store. Then the application gets restarted, will I still have those 50 records in the state store? If not, is there a way to achieve that?

Thanks!

Upvotes: 7

Views: 9078

Answers (1)

Matus Cimerman
Matus Cimerman

Reputation: 447

Yes, state store is by default persisted on disk. When applications is restarted and application-id wasn't changed, state will be recovered from disk, containing all 50 records. New records will be added from offset when application was killed/stopped/restarted.

Edit: Seems like you're missing aggregation operation on top of the KTable, this is required. See my code example:

final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);

final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
        .map((key,value)->{
            ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
            return new KeyValue<>(newKey,value);
        })
        .filter((key,value)->key!=null)
        .groupByKey()
        .count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);

Upvotes: 10

Related Questions