Reputation: 809
While using custom partitioner in Apache Flink, I would like to assign some elements of the dataset to more than one partition. Currently I try to duplicate those elements and assign each one to one cluster. I would like to know is there any ways to do so? If not what is efficient way of duplicating a subset of dataset?
Upvotes: 3
Views: 169
Reputation: 13346
In order to generate overlapping partitions you first have to duplicate your elements. Given that you know which elements to duplicate this can be done with the flatMap
operation. Since you want to assign duplicated elements to different partitions, it is best to assign the partition ID from within the flatMap
operation. Based on this ID you can then apply the partitioning step.
Given an input data set input: DataSet[IN]
you generate a duplicated data set duplicated: DataSet[(Int, IN)]
which contains tuples of the original input elements and their corresponding partition ID. Afterwards you can apply the partitioning on the first tuple field.
val duplicatedDS: DataSet[(Int, IN)] = input.flatMap(x => duplicateElement(x))
val partitioned = duplicatedDS.partitionByHash(0)
Upvotes: 4