Reputation: 950
free -h
). I suspect that the reason could be Rocksdb.ValueState
's value expired, then rocksdb will remove from its memory and will delete from localstorage directory? (I have also limited storage capacity)stream.keyBy(ipAddress)
, if this ipAddress
will be hold by rocksdb (i am talking about keyBy itself not the state), does it always place in managed memory? If not, then flink heap memory will be increased?Here is the general structure of my application:
streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....
Here is the state example from my application:
private transient ValueState<SampleObject> sampleState;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
"sampleState",
TypeInformation.of(SampleObject.class)
);
sampleValueStateDescriptor.enableTimeToLive(ttlConfig);
Rocksdb configuration:
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local
Upvotes: 0
Views: 641
Reputation: 43697
Flink ValueState will be removed from storage after expired when using Rocksdb?
Yes, but not immediately. (And in some earlier versions of the Flink, the answer was "it depends".)
In your state ttl config you haven't specified how you want state cleanup to be done. In this case, expired values are explicitly removed on read (such as ValueState#value
) and are otherwise periodically garbage collected in the background. In the case of RocksDB, this background cleanup is done during compaction. In other words, the cleanup isn't immediate. The docs provide more details on how you can tune this -- you could configure the cleanup to be done more quickly, at the expense of some performance degradation.
A keyBy itself does not use any state. The key selector function is used to partition the stream, but the keys are not stored in connection with the keyBy. Only the windows and flatmap operations are keeping state, which is per-key state, and all of this keyed state will be in RocksDB (unless you have configured your timers to be on the heap, which is an option, in but Flink 1.10 timers are stored off-heap, in rocksdb, by default).
You could change the flatmap
to a KeyedProcessFunction
and use timers to explicitly clear state for state keys -- which would give you direct control over exactly when the state is cleared, rather than relying on the state TTL mechanism to eventually clear the state.
But it's more likely that the windows are building up considerable state. If you can switch to doing pre-aggregation (via reduce
or aggregate
) that may help a lot.
Upvotes: 1