Reputation: 21
I need to use the Kafka Streams DSL cache to reduce the amount of write volume to downstream processors. However, our app processes tombstones, which introduces a complication. For example, given the following records for a single key, K1
:
<K1, V1>
<K1, V2>
<K1, V3>
The DSL cache may only emit the final record of:
<K1, V3>
With the DSL cache turned off, of course, it would emit all of the intermediate records:
<K1, V1>
<K1, V2>
<K1, V3>
Everything is working as expected so far. But, with tombstones, the raw sequence becomes:
<K1, V1>
<K1, V2>
<K1, V3>
<K1, NULL>
So depending on when the cache is flushed, we may never see the final count. e.g.
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | deleted
would mean <K1, V2>
is flushed, but never <K1, V3>
. The semantics I'm trying to achieve involves flushing the latest record for a given key in the cache whenever a tombstone is received for that key.
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | emit the latest record (`<K1, V3>`), then delete.
I have not been able to do this with the DSL, and the Processor API doesn't expose the underlying cache, so can't do it there either. I'm thinking about implementing a custom in-memory cache and using that with the Processor API, but it gets complicated because it seems like there could be data loss if the app is shutdown ungracefully (e.g. SIGKILL). Not sure how the DSL cache handles ungraceful shutdowns either (e.g. maybe there's dataloss) so maybe the implementation I'm thinking of can be modeled after the DSL cache.
Anyways, am I over thinking this problem? Is there a way to flush the latest record from the DSL cache when a tombstone is received, instead of implementing a custom cache?
Upvotes: 2
Views: 379
Reputation: 62350
we may never see the final count
I understand what you are saying, however, for this case the "final" record is the tombstone, so you do see the final one. What you wants is a specific intermediate result. The DSL does not allow such a fine grained configuration to do this.
the Processor API doesn't expose the underlying cache
Well, it does. Using Stores.keyValueStoreBuilder()
you can call withCachingEnabled()
on the returned StoreBuilder
. Note, for this case, by default no records are emitted downstream and you need to implement the emit logic manually. Ie, you don't know when the cache is flushed, and if it's flushed, it only flushing to local disk and the changelog-topic, but not data is emitted downstream on flush.
You could register a punctuation to emit data in regular time interval. Also, each time you process a tombstone, you can emit the currently stored value from the store before you do the delete on the store.
Upvotes: 1