Reputation: 199
I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream. In order to achieve this I'm keying the stream based on an attribute(of flink's internal RowData type) and then I keep on accumulating the elements in a hashmap(which happens to be of type <RowData, MyPojo>). When triggering the finishBundle function, I'm iterating through all the keys, setting the key for the context on every iteration using
ctx.setCurrentKey(currentKey);
performing the transformations and updating the state.
Here are what the key components of the code look like.
public void setStateKey(String id) {
this.setStateKey = GenericRowData.of(id);
}
Keying of the stream
stream1
.union(stream2)
.keyBy(MyPojo::getStateKey)
.transform("M",TypeInformation.of(MyPojo.class), microBatchAccumulator)
.sinkTo(kafkaSink);
Implementation of the finishBundle function.
@Override
public void finishBundle(Map<RowData, List<MoPojo>> buffer, Collector<MyPojo> collector) throws Exception {
buffer.forEach((currentKey, currentValue) -> {
log.info("Hash of the current Key -> {}", currentKey.hashCode());
ctx.setCurrentKey(currentKey);
/....some code../////
This setup works fine locally, when deployed on kubernetes fails with the exception:
Caused by: java.lang.IllegalArgumentException: Key group 193 is not in KeyGroupRange{startKeyGroup=144, endKeyGroup=179}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383) ~[flink-dist-1.19.0.jar:1.19.0]
... 18 more
The hashcode that I'm logging also seems to be inconsistent over restarts, for the same string ids being processed.
Flink version -> 1.19.0
parallelism -> 20
max parallelism -> 720
Upvotes: 1
Views: 49