Reputation: 109
I have a use-case wherein I need to use custom partitioner & eventually sort the partitions.
implicit val ordering: Ordering[Array[Byte]] = new LexicographicalOrdering
val partitionedDf = spark.createDataFrame(
dataset
.select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), col(rangeKeyName)).as("key"),
RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value")).
rdd
.map(record => (record.getAs[Array[Byte]](0),record.getAs[Array[Byte]](1)))
.repartitionAndSortWithinPartitions(new XXHashRangeBasedPartitioner(sparkArguments.getPartitionConfigurationArguments.getNumberOfPartitions))
.map(pair => Row(pair._1, pair._2))
, DefaultSource.SUPPORTED_SCHEMA)
As you might have already noticed, getKeyUDF & getValueUDF returns Array[Byte]. I have defined a custom Partitioner XXHashRangeBasedPartitioner and have a [Array[Byte]] to be used internally for sorting.
I need to drop duplicates from my dataset at some point of time. Considering repartitionAndSortWithinPartitions already causes a shuffle & sorts my partitions. Is there a way I can club a dropDuplicates with it or have a dropDuplicates which works efficiently on sorted partitions (just at partition level) without invoking another shuffle. So, basically achieve duplication removal, custom partition & sort with just one shuffle.
A reduceByKey on the rdd or a dropDuplicates on dataset both results into an addition shuffle. reduceByKey on rdd has another issue that it won't work with Array[Byte] and I will have to wrap it & provide hashCode & equals implementation.
So, what's best that can be done in this situation?
Upvotes: 0
Views: 298
Reputation: 27373
you could add an mapPartitions
- step after repartitionAndSortWithinPartitions
def dedup(rows : Seq[(Array[Byte],Array[Byte])]) : Seq[(Array[Byte],Array[Byte])]= {
// your deduplication logic
}
rdd
.repartitionAndSortWithinPartitions(new XXHashRangeBasedPartitioner(sparkArguments.getPartitionConfigurationArguments.getNumberOfPartitions))
.mapPartitions(rows => dedup(rows.toList).toIterator, preservesPartitioning = true)
This will not add a shuffle and partitioning will be preserved (but maybe not sorting)
Upvotes: 0