Aman Vaishya
Aman Vaishya

Reputation: 199

Issue while setting current context using MapBundleOperator in Flink

I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream. In order to achieve this I'm keying the stream based on an attribute(of flink's internal RowData type) and then I keep on accumulating the elements in a hashmap(which happens to be of type <RowData, MyPojo>). When triggering the finishBundle function, I'm iterating through all the keys, setting the key for the context on every iteration using

ctx.setCurrentKey(currentKey);

performing the transformations and updating the state.

Here are what the key components of the code look like.

  1. Setter of the key based on which the stream is being keyed
    public void setStateKey(String id) {
        this.setStateKey = GenericRowData.of(id);
    }
  1. Keying of the stream

    stream1
       .union(stream2)
       .keyBy(MyPojo::getStateKey)
       .transform("M",TypeInformation.of(MyPojo.class), microBatchAccumulator)
       .sinkTo(kafkaSink);
    
  2. Implementation of the finishBundle function.

    @Override
    public void finishBundle(Map<RowData, List<MoPojo>> buffer, Collector<MyPojo> collector) throws Exception {
    
        buffer.forEach((currentKey, currentValue) -> {
    
            log.info("Hash of the current Key -> {}", currentKey.hashCode());
    
            ctx.setCurrentKey(currentKey);
            /....some code../////
    

This setup works fine locally, when deployed on kubernetes fails with the exception: 

Caused by: java.lang.IllegalArgumentException: Key group 193 is not in KeyGroupRange{startKeyGroup=144, endKeyGroup=179}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
    at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.19.0.jar:1.19.0]
    at org.apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249) ~[flink-dist-1.19.0.jar:1.19.0]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.19.0.jar:1.19.0]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383) ~[flink-dist-1.19.0.jar:1.19.0]
    ... 18 more
The hashcode that I'm logging also seems to be inconsistent over restarts, for the same string ids being processed.

Flink version -> 1.19.0
parallelism -> 20
max parallelism -> 720

Upvotes: 1

Views: 49

Answers (0)

Related Questions