S S
S S

Reputation: 235

How to add new Column in pyspark and insert multiple values with based on rows?

I'm new to pyspark. I want to add a new column with multiple values and the partition with those values.

import math

coun=df.count()

if(coun<= 20000):
    chunksize=2
    rowsperchunk = math.ceil(coun/2)
else:
    chunksize= math.ceil(coun/20000)
    rowsperchunk = 20000

for i in chunksize:
    df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))

in for loop above it will only insert 1 value till limit

example: i have 100k rows in my data frame so chunk size will be 5. and rows per chunk is 20 000 so i need add new column first 20 000 rows need to be inserted with value 1 and the next 20 000 rows needs to be inserted with value 2. till the end of chunksize. then i want to partition based on the new column we created

Upvotes: 2

Views: 1307

Answers (1)

Oli
Oli

Reputation: 10406

So you want to repartition the data so that it is partitionned in partitions of the same size, and while preserving the order.

In it not that easy in spark. What I would do is start by counting the size of each partition. Then, for each partition I would compute the number of records that are in the dataframe in the previous partitions. With that and the rank of the record in the partition (partition_rank), a division by the size of the desired partitions will give me the new allocation. Note that I introduce an index column to compute the rank and preserve the order. Here is the code:

partition_size = 20000

from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]

cum_part_counts = []
sum=0
for index, count in part_counts:
    cum_part_counts.append((index, sum))
    sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])

repartitioned_df = df\
  .withColumn("partition_index", F.spark_partition_id())\
  .withColumn("index", F.monotonically_increasing_id())\
  .withColumn("partition_rank", F.rank().over(
           Window.partitionBy("partition_index").orderBy("index")))\
  .join(cum_part_counts_df, ['partition_index'])\
  .withColumn("new_partition",
      F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
  .orderBy("index")\
  .write.partitionBy("new_partition").parquet("...")

Upvotes: 1

Related Questions