daniel9x
daniel9x

Reputation: 805

How do I parallelize writing a list of Pyspark dataframes across all worker nodes?

I have a basic AWS Glue 4.0 Job I'm trying to run that runs a transform function and returns a list of dataframes:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from transform import transform
from pyspark.sql.functions import lit
from datetime import datetime

# ========================================== CONTEXT INITIALIZATION ====================================================
args = getResolvedOptions(sys.argv)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ======================================================================================================================



pyspark_df_list = transform(inputs)


// NOT SURE what to do here to achieve parallelization

# ======================================================================================================================

job.commit()

Things I've tried:

  1. Iterating through the list seems to break the paralleization and essentially makes the write operations single-threaded.
  2. Creating a write_df function and trying to call the parallelize/forEach function off the SpaceContext gives me the following error:

RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Upvotes: 1

Views: 833

Answers (2)

Ged
Ged

Reputation: 18098

Actually you are parallelizing the individual dataframe.

  1. In Scala you can use .par. That said, it depends on your resource allocation mode used with Resource Manager, e.g. YARN.
  2. You can write N Spark Apps as alternative.
  3. Or you can have multiple independent (N) Actions in a single Spark App, with N arrays. For both SCALA and pyspark.
  4. For pyspark you can use ThreadPoolExecutor. See https://gist.github.com/pavel-filatov/87a68dd621546b9cac1e0d2ea269705f. An excellent explanation on .par equivalent in pyspark.

I am not sure I would go down latter or .par approach.

Upvotes: 0

Charlie Flowers
Charlie Flowers

Reputation: 1380

Use reduce/unionAllByName to create a single DataFrame and then write that. If that still doesn't achieve the degree of parallelism you require, user repartition to create more partitions in your data (1 partition = 1 worker process during write):

from functools import reduce

df = reduce(lambda x,y: x.unionAllByName(y), pyspark_df_list)

df.write.repartition(128).format(....

Upvotes: 0

Related Questions