Ahmad.S
Ahmad.S

Reputation: 809

Overlapped partitions in apache Flink

While using custom partitioner in Apache Flink, I would like to assign some elements of the dataset to more than one partition. Currently I try to duplicate those elements and assign each one to one cluster. I would like to know is there any ways to do so? If not what is efficient way of duplicating a subset of dataset?

Upvotes: 3

Views: 169

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

In order to generate overlapping partitions you first have to duplicate your elements. Given that you know which elements to duplicate this can be done with the flatMap operation. Since you want to assign duplicated elements to different partitions, it is best to assign the partition ID from within the flatMap operation. Based on this ID you can then apply the partitioning step.

Given an input data set input: DataSet[IN] you generate a duplicated data set duplicated: DataSet[(Int, IN)] which contains tuples of the original input elements and their corresponding partition ID. Afterwards you can apply the partitioning on the first tuple field.

val duplicatedDS: DataSet[(Int, IN)] = input.flatMap(x => duplicateElement(x))
val partitioned = duplicatedDS.partitionByHash(0) 

Upvotes: 4

Related Questions