Reputation: 17534
I've read:
I'm trying to generate a test dataset with 10 billion rows by crossJoin
ing a seed dataframe with some other dataframes. My problem is that the last step of the process final_df.write.parquet()
only uses one worker/executor no matter how many there are.
This obviously doesn't scale to generate billions. Which is the problem.
E.g. in a 3 node cluster with 4 cores each, final_df
has 64 partitions, but only one executor writes one parquet file with all the records from that dataframe. I've also tried with 12 nodes, which produces dataframe with 1936 partitions. But same problem.
A few observations:
crossJoin()
. One node is master leaving 2 executors, each with 4 processors so it comes out to: 2*2*4*4 = 64
.coalesce()
it reduces number of partitions to 8. But only one executor writes one parquet file with all the records from that dataframe.repartition()
(no coalesce()
), then I get 8 files, all executors are used and writing is distributed perfectly. BUT now the problem moves to repartitioning step, which is done by just one executor. Same problem in the end.import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt
def log(*args):
print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))
log('spark.version', str(spark.version))
log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://[email protected]/seed.csv", header=True)
# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))
log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))
df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column
# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))
log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://[email protected]/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())
2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions(): 64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806
... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions(): 32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806
... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions(): 8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806
DAG Visualization of the stage that takes long time:
PS: Ignore the slight mismatch in NUM_RECORDS_TO_GENERATE
value and the actual number of records generated. It's probably a math problem in sqrt
and I don't care if it's off by a few millions.
Upvotes: 1
Views: 2272
Reputation: 17534
So I solved it, but I still don't know why the old code uses only one executor.
I added a NEW STEP to repartition the original dataframe before crossJoining with others. After that the resultant dataframes use all executors.
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt
def log(*args):
print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))
log('spark.version', str(spark.version))
log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://[email protected]/seed.csv", header=True)
# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))
log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))
df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
# --------------- NEW STEP ---------------
# with this final_df.write.parquet() uses all executors and scales up
df2 = df2.repartition(df2.rdd.getNumPartitions())
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column
# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))
log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://[email protected]/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())
2020-12-08T17:31:25.674825 spark.version 3.0.1
2020-12-08T17:31:25.674927 reading seed file
2020-12-08T17:31:32.770631 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:31:33.940648 uniq_df.rdd.getNumPartitions(): 16
2020-12-08T17:31:33.940848 writing parquet
2020-12-08T17:31:37.658914 wrote parquet.
2020-12-08T17:31:39.612749 uniq_df.rdd.count(): 1989806
2020-12-08T17:37:16.896377 spark.version 3.0.1
2020-12-08T17:37:16.896478 reading seed file
2020-12-08T17:37:18.303734 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:37:18.817331 uniq_df.rdd.getNumPartitions(): 256
2020-12-08T17:37:18.817558 writing parquet
2020-12-08T17:37:28.015959 wrote parquet.
2020-12-08T17:37:37.973600 uniq_df.rdd.count(): 1989806
Upvotes: 0