Reputation: 7775
EDIT 2022/02/18: I returned to this problem after a few years, and I believe my new solution below is substantially more performant than the current highest-voted solution.
Suppose I have a DataFrame with a column partition_id
:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
How can I repartition the DataFrame to guarantee that each value of partition_id
goes to a separate partition, and that there are exactly as many actual partitions as there are distinct values of partition_id
?
If I do a hash partition, i.e. df.repartition(n_partitions, 'partition_id')
, that guarantees the right number of partitions, but some partitions may be empty and others may contain multiple values of partition_id
due to hash collisions.
Upvotes: 7
Views: 7357
Reputation: 7775
The previous accepted solution which requires converting from a DataFrame to an RDD and back works, but is quite slow due to the repartitioning required.
The solution given below is much more performant -- the additional Spark operations are quite fast and thus overall it doesn't require any more compute/shuffle than a naive repartition.
At a high level, we use an iterative algorithm to invert Spark's partition hashing, then use this inverted map to create new partition keys which (when partitioned upon) give the expected distribution of partitions.
import itertools
from pyspark.sql import Row
import pyspark.sql.functions as F
def construct_reverse_hash_map(spark, n_partitions, fact = 10):
"""
Given a target number of partitions, this function constructs a
mapping from each integer partition ID (0 through N-1) to an
arbitrary integer, which Spark will hash to that partition ID.
By using these new (seemingly arbitrary) integers as a column
to repartition on, one can guarantee a 1-to-1 mapping of
partition levels to the final partitions.
Example return value, for n_partitions=10:
{
5: 80,
9: 90,
8: 94,
7: 99,
0: 92,
1: 98,
6: 87,
2: 91,
3: 85,
4: 93
}
If one had a column in a dataframe with 10 unique values, 80, 90, 94,
etc, and then partitioned on this column into 10 partitions, then
every row with value 80 would go into partition 5, every row with
value 90 would go into partition 9, and so on.
:param spark: SparkSession object
:param n_partitions: desired number of unique partitions
:param fact: initial search space of IDs will be n_partitions*fact
:return: dictionary mapping from sequential partition IDs to hashed
partition IDs.
"""
max_retries = 10
for i in range(max_retries):
bigger_factor = fact * 2 ** i
hashes = (
spark.createDataFrame([Row(orig_id=i) for i in list(range(n_partitions * bigger_factor))])
.withColumn("h", F.hash("orig_id") % n_partitions)
.select("orig_id", F.when(F.col("h") >= 0, F.col("h")).otherwise(F.col("h") + n_partitions).alias("new_id"))
)
n_unique_ids = hashes.groupBy("new_id").count().count()
if n_unique_ids == n_partitions:
# find a mapping between the hashed values and the original partition IDs
return {row["new_id"]: row["orig_id"] for row in hashes.collect()}
raise Exception("Spark reverse hash algorithm failed to converge")
def add_deterministic_1to1_partitioner(df, original_part_col, new_part_col, part_levels, seed=42):
"""
Returns a DataFrame with a new column which can be repartitioned on to give exactly the desired partitions. We determine what
values this column will have by inverting Spark's hash.
:param df: original DataFrame
:param original_part_col: logical column to be repartitioned on
:param new_part_col: new column to be actually repartitioned on
:param part_levels: list of unique values of part_col
:param seed: seed value for quasirandom assignment to partitions
:return: original DataFrame plus new column for repartitioning
"""
part_level_map = {part_level: i for i, part_level in enumerate(part_levels)}
part_level_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(part_level_map.items()))])
hash_map = construct_reverse_hash_map(df.sql_ctx.sparkSession, len(part_level_map))
hash_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(hash_map.items()))])
return (
# convert partition level to sequential numeric partition ID
df.withColumn("__part_id__", part_level_map_expr[F.col(original_part_col)].astype("bigint"))
# add col which will result in 1-to-1 partitioning when repartitioend on
.withColumn(new_part_col, hash_map_expr[F.col("__part_id__")].astype("bigint"))
.drop("__part_id__")
)
Demonstrating the functionality:
# construct example DataFrame
data = [
[1, 'A0'],
[1, 'A1'],
[2, 'B0'],
[2, 'B1'],
[3, 'C0'],
[3, 'C1'],
]
partition_levels = list(set([pid for pid, _ in data]))
n_partitions = len(partition_levels)
df = spark.sparkContext.parallelize(data).toDF(('partition_id', 'val'))
A naive repartitioning on the desired partition column results in collisions -- note that rows with partition IDs 1 and 2 are both slotted into partition 2:
df_naive_repartition = df.repartition(n_partitions, "partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_naive_repartition.orderBy("partition_id", "val").show()
#+------------+---+-------------------+
#|partition_id|val|actual_partition_id|
#+------------+---+-------------------+
#| 1| A0| 2|
#| 1| A1| 2|
#| 2| B0| 2|
#| 2| B1| 2|
#| 3| C0| 0|
#| 3| C1| 0|
#+------------+---+-------------------+
Whereas adding the deterministic partition key and then repartitioning by it results in each group being assigned to exactly one partition:
df = add_deterministic_1to1_partitioner(df, "partition_id", "deterministic_partition_id", partition_levels)
df_1to1_repartition = df.repartition(n_partitions, "deterministic_partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_1to1_repartition.orderBy("partition_id", "val").show()
#+------------+---+--------------------------+-------------------+
#|partition_id|val|deterministic_partition_id|actual_partition_id|
#+------------+---+--------------------------+-------------------+
#| 1| A0| 28| 0|
#| 1| A1| 28| 0|
#| 2| B0| 29| 1|
#| 2| B1| 29| 1|
#| 3| C0| 27| 2|
#| 3| C1| 27| 2|
#+------------+---+--------------------------+-------------------+
(The deterministic_partition_id
column can be dropped after the repartitioning -- I show it here only to add a little clarity about how the hash map function works.)
Upvotes: 1
Reputation: 35249
There is no such option with Python and DataFrame
API. Partitioning API in Dataset
is not plugable and supports only predefined range and hash partitioning schemes.
You can convert data to RDD
, partition with custom partitioner, and read convert back to DataFrame
:
from pyspark.sql.functions import col, struct, spark_partition_id
mapping = {k: i for i, k in enumerate(
df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}
result = (df
.select("partition_id", struct([c for c in df.columns]))
.rdd.partitionBy(len(mapping), lambda k: mapping[k])
.values()
.toDF(df.schema))
result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# | 1| A| 0|
# | 1| B| 0|
# | 2| A| 1|
# | 2| C| 1|
# +------------+---+-------------------+
Please remember that this only creates specific distribution of data and doesn't set partitioner that can be used by Catalyst optimizer.
Upvotes: 15