xiaoxiao
xiaoxiao

Reputation: 1

Unbalanced processing of KeyedStream

I am facing a problem where KeyedStream is purely parallelised on workers for case where number of keys is close to parallelism

My input records to a range of 0-N。 When I use keyBy , Some workers process zero keys, some more than one. This is because of KeyGroupRangeAssignment.assignKeyToParallelOperator() in KeyGroupStreamPartitioner use murmurHash to the key.hasCode and select the channel。

I know the partitionCustom can handler this situation,but partitionCustom only return DataStream,not KeyStream。

So what can I do to handler without hack-ish?

Upvotes: 0

Views: 667

Answers (2)

Arvid Heise
Arvid Heise

Reputation: 3634

It's unfortunately a bit burried, but there is a way of turning a DataStream into a KeyedStream.

DataStream<T> partitioned = input.partitionCustom(...);
KeyedStream<T> keyed = DataStreamUtils.reinterpretAsKeyedStream(partitioned, ...)

Upvotes: 0

kkrugler
kkrugler

Reputation: 9245

Well, it's kind of hack-ish, but see makeKeyForOperatorIndex. I've used a custom RichMapFunction that figures out which subtask index it is in its open() call, and then uses makeKeyForOperatorIndex to create a key (Integer or String) that is added as a field and then used for the keyBy().

Upvotes: 2

Related Questions