Reputation: 1
I am facing a problem where KeyedStream is purely parallelised on workers for case where number of keys is close to parallelism
My input records to a range of 0-N。 When I use keyBy , Some workers process zero keys, some more than one. This is because of KeyGroupRangeAssignment.assignKeyToParallelOperator()
in KeyGroupStreamPartitioner
use murmurHash to the key.hasCode and select the channel。
I know the partitionCustom can handler this situation,but partitionCustom only return DataStream,not KeyStream。
So what can I do to handler without hack-ish?
Upvotes: 0
Views: 667
Reputation: 3634
It's unfortunately a bit burried, but there is a way of turning a DataStream
into a KeyedStream
.
DataStream<T> partitioned = input.partitionCustom(...);
KeyedStream<T> keyed = DataStreamUtils.reinterpretAsKeyedStream(partitioned, ...)
Upvotes: 0
Reputation: 9245
Well, it's kind of hack-ish, but see makeKeyForOperatorIndex. I've used a custom RichMapFunction
that figures out which subtask index it is in its open()
call, and then uses makeKeyForOperatorIndex
to create a key (Integer or String) that is added as a field and then used for the keyBy()
.
Upvotes: 2