Kashyap
Kashyap

Reputation: 17534

DataFrame.write.parquet() uses only one executor, does not scale

I've read:


I'm trying to generate a test dataset with 10 billion rows by crossJoining a seed dataframe with some other dataframes. My problem is that the last step of the process final_df.write.parquet() only uses one worker/executor no matter how many there are.

This obviously doesn't scale to generate billions. Which is the problem.

E.g. in a 3 node cluster with 4 cores each, final_df has 64 partitions, but only one executor writes one parquet file with all the records from that dataframe. I've also tried with 12 nodes, which produces dataframe with 1936 partitions. But same problem.

A few observations:

import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt

def log(*args):
  print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))

log('spark.version', str(spark.version))

log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://[email protected]/seed.csv", header=True)

# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))

log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))

df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column

# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))

log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://[email protected]/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())

output

2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions():  64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806

aggregated metrics and files produced

coalesce

... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions():  32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806

aggregated metrics and files produced

repartition

... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions():  8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806

aggregated metrics and files produced


DAG Visualization of the stage that takes long time: DAG Chart

PS: Ignore the slight mismatch in NUM_RECORDS_TO_GENERATE value and the actual number of records generated. It's probably a math problem in sqrt and I don't care if it's off by a few millions.

Upvotes: 1

Views: 2272

Answers (1)

Kashyap
Kashyap

Reputation: 17534

So I solved it, but I still don't know why the old code uses only one executor.

I added a NEW STEP to repartition the original dataframe before crossJoining with others. After that the resultant dataframes use all executors.

import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt

def log(*args):
  print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))

log('spark.version', str(spark.version))

log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://[email protected]/seed.csv", header=True)

# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))

log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))

df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')

# --------------- NEW STEP ---------------
# with this final_df.write.parquet() uses all executors and scales up
df2 = df2.repartition(df2.rdd.getNumPartitions())

df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column

# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))

log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://[email protected]/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())

with new step:

2020-12-08T17:31:25.674825 spark.version 3.0.1
2020-12-08T17:31:25.674927 reading seed file
2020-12-08T17:31:32.770631 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:31:33.940648 uniq_df.rdd.getNumPartitions():  16
2020-12-08T17:31:33.940848 writing parquet
2020-12-08T17:31:37.658914 wrote parquet.
2020-12-08T17:31:39.612749 uniq_df.rdd.count(): 1989806

enter image description here enter image description here


If remove the NEW STEP:

2020-12-08T17:37:16.896377 spark.version 3.0.1
2020-12-08T17:37:16.896478 reading seed file
2020-12-08T17:37:18.303734 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:37:18.817331 uniq_df.rdd.getNumPartitions():  256
2020-12-08T17:37:18.817558 writing parquet
2020-12-08T17:37:28.015959 wrote parquet.
2020-12-08T17:37:37.973600 uniq_df.rdd.count(): 1989806

enter image description here enter image description here

Upvotes: 0

Related Questions