Nisarg
Nisarg

Reputation: 109

repartitionAndSortWithinPartitions & remove duplicates which a single shuffle

I have a use-case wherein I need to use custom partitioner & eventually sort the partitions.

    implicit val ordering: Ordering[Array[Byte]] = new LexicographicalOrdering

    val partitionedDf = spark.createDataFrame(
        dataset
          .select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), col(rangeKeyName)).as("key"),
              RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value")).
        rdd
          .map(record => (record.getAs[Array[Byte]](0),record.getAs[Array[Byte]](1)))
          .repartitionAndSortWithinPartitions(new XXHashRangeBasedPartitioner(sparkArguments.getPartitionConfigurationArguments.getNumberOfPartitions))
          .map(pair => Row(pair._1, pair._2))
      , DefaultSource.SUPPORTED_SCHEMA)

As you might have already noticed, getKeyUDF & getValueUDF returns Array[Byte]. I have defined a custom Partitioner XXHashRangeBasedPartitioner and have a [Array[Byte]] to be used internally for sorting.

I need to drop duplicates from my dataset at some point of time. Considering repartitionAndSortWithinPartitions already causes a shuffle & sorts my partitions. Is there a way I can club a dropDuplicates with it or have a dropDuplicates which works efficiently on sorted partitions (just at partition level) without invoking another shuffle. So, basically achieve duplication removal, custom partition & sort with just one shuffle.

A reduceByKey on the rdd or a dropDuplicates on dataset both results into an addition shuffle. reduceByKey on rdd has another issue that it won't work with Array[Byte] and I will have to wrap it & provide hashCode & equals implementation.

So, what's best that can be done in this situation?

Upvotes: 0

Views: 298

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

you could add an mapPartitions - step after repartitionAndSortWithinPartitions

def dedup(rows : Seq[(Array[Byte],Array[Byte])]) : Seq[(Array[Byte],Array[Byte])]= {
  // your deduplication logic
}

rdd
  .repartitionAndSortWithinPartitions(new XXHashRangeBasedPartitioner(sparkArguments.getPartitionConfigurationArguments.getNumberOfPartitions))
  .mapPartitions(rows => dedup(rows.toList).toIterator, preservesPartitioning = true)

This will not add a shuffle and partitioning will be preserved (but maybe not sorting)

Upvotes: 0

Related Questions