user947659
user947659

Reputation: 2645

How to efficiently multiply rows in PySpark dataframe?

I am trying to make a synthetic dataset by taking an existing small dataset and making it much larger. I want the target size to be 20M rows. My current method:

for i in range(int(log(130000, 2))): 
    table_copy = table_copy.unionAll(table_copy)

But this slows down A LOT after the 12th iteration (of 17). Is there a much faster way of making the dataframe consisting of 150 rows into 20M?

Upvotes: 0

Views: 1102

Answers (2)

ZygD
ZygD

Reputation: 24386

This works best:
(5 seconds = 20M rows)

df = spark.range(150)
factor = 135000
df = df.withColumn('a', F.expr(f'explode(array_repeat(0,{factor}))')).drop('a')

Idea proposed by this smart guy

In your case it could be just:

table_copy = table_copy.withColumn('a', F.expr('explode(array_repeat(0,135000))')).drop('a')

Other tested options

(16 seconds = 1.5M rows)

import pyspark.sql.functions as F
df = spark.range(150)
df = df.withColumn('array', F.explode(F.array(*map(F.lit, range(1000)))))
df = df.drop('array')

(11 seconds = 38k rows):

def union_self(df, p):
    if p:
        df = union_self(df, p - 1)
        return df.union(df)
    return df

df = spark.range(150)
df = union_self(df, 8)

(16 seconds = 38k rows):

from functools import reduce
df = spark.range(150)
df = reduce(lambda df1, df2: df1.union(df2), [df] * 256)

Upvotes: 1

Se ven
Se ven

Reputation: 405

If I understand right. you want expand or scale-up the dataset multiple with same data:

val replicas = 5 // calcu yourself and i've never try 20M
val dsReplicated = ds.flatMap(a => 0 until replicas map ((a, _))).map(_._1)

or for dataframe:

val dfReplicated = df
      .withColumn("__temporarily__", functions.typedLit((0 until replicas).toArray))
      .withColumn("idx", functions.explode($"__temporarily__"))
      .drop($"__temporarily__")
      .drop($"idx")

Upvotes: 0

Related Questions