gcandal
gcandal

Reputation: 957

Can I use a custom partitioner with group by?

Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.

I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

What I'm trying to achieve is:

Example of the use-case:

Upvotes: 2

Views: 1067

Answers (2)

Alex
Alex

Reputation: 11

For those who still are looking for solution, I made a dirty hack to workaround this limitation. Now, this is bad workaround, so I would recommend to use only if there absolutely no other way to solve your issue. What I did is created an inverse function for MurMurHash function that is used inside Flink code to partition the data. Now, you can provide a tampered key for KeySelector like that:

stream.keyBy(x -> inverseMurmurHash(x.getSomeIntField()))

And that will be mapped to a int value you provided from record.

With this function you can basically implement your own physical partitioner that returns KeyedStream. Of course you need to be mindful and careful, because it will work as long as Flink doesn't change hash algorithm.

Upvotes: 0

Fabian Hueske
Fabian Hueske

Reputation: 18987

That is unfortunately not possible. DataStreamUtils.reinterpretAsKeyedStream() requires that the data is identically partitioned as if you would have called keyBy().

The reason for this limitation are key groups and how keys are mapped to key groups. A key group is Flink's unit of how keyed state is distributed. The number of key groups determines the maximum parallelism of an operator and is configured with setMaxParallelism(). Keys are assigned to key groups with an internal hash function. By changing the partitioning of keys, keys for the same key group will be distributed across multiple machines which will not work.

In order to tweak the assignment of key to machines, you would need to change the assignment of keys to key groups. However, there is no a public or accessible interface to do that. Therefore, custom key distributions are not supported in Flink 1.6.

Upvotes: 3

Related Questions