Reputation: 235
I'm new to pyspark. I want to add a new column with multiple values and the partition with those values.
import math
coun=df.count()
if(coun<= 20000):
chunksize=2
rowsperchunk = math.ceil(coun/2)
else:
chunksize= math.ceil(coun/20000)
rowsperchunk = 20000
for i in chunksize:
df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))
in for loop above it will only insert 1 value till limit
example: i have 100k rows in my data frame so chunk size will be 5. and rows per chunk is 20 000 so i need add new column first 20 000 rows need to be inserted with value 1 and the next 20 000 rows needs to be inserted with value 2. till the end of chunksize. then i want to partition based on the new column we created
Upvotes: 2
Views: 1307
Reputation: 10406
So you want to repartition the data so that it is partitionned in partitions of the same size, and while preserving the order.
In it not that easy in spark. What I would do is start by counting the size of each partition. Then, for each partition I would compute the number of records that are in the dataframe in the previous partitions. With that and the rank of the record in the partition (partition_rank
), a division by the size of the desired partitions will give me the new allocation. Note that I introduce an index
column to compute the rank and preserve the order. Here is the code:
partition_size = 20000
from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]
cum_part_counts = []
sum=0
for index, count in part_counts:
cum_part_counts.append((index, sum))
sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])
repartitioned_df = df\
.withColumn("partition_index", F.spark_partition_id())\
.withColumn("index", F.monotonically_increasing_id())\
.withColumn("partition_rank", F.rank().over(
Window.partitionBy("partition_index").orderBy("index")))\
.join(cum_part_counts_df, ['partition_index'])\
.withColumn("new_partition",
F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
.orderBy("index")\
.write.partitionBy("new_partition").parquet("...")
Upvotes: 1