Dark Shadows
Dark Shadows

Reputation: 65

How to run python code across nodes in an EMR cluster

I have an Amazon EMR Cluster - 30 nodes My Python code looks like this -

spark = SparkSession \
        .builder \
        .appName("App") \
        .config(conf=sparkConf) \
        .getOrCreate()

def fetchCatData(cat, tableName):
    df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat))
    df_pandas = df_gl.select("*").toPandas()
    df_pandas.to_csv("/tmp/split/{}_{}.csv".format(tableName, cat))

catList = [14, 15, 63, 65, 74, 21, 23, 60, 79, 86, 107, 147, 196, 199, 200, 201, 229, 263, 265, 267, 328, 421, 468, 469,504]
tableList = ["Table1","Table2"
             ,"Table3",
             "Table4", "Table5", "Table6",
             "Table7"
             ]

def main(args):
    log4jLogger = spark._jvm.org.apache.log4j
    LOGGER = log4jLogger.LogManager.getLogger(__name__)

    for table in tableList:
        LOGGER.info("Starting Split for {}".format(table))
        dataLocation = "s3://test/APP/{}".format( table)
        df = spark.read.parquet(dataLocation)
        df = df.repartition("CATEGORY").cache()
        df.createOrReplaceTempView(table)
        for cat in catList:
            fetchGLData(cat, table)

I want to solve the following problem -

  1. Basically I want to read my parquet data, divide it by category and store it as a pandas dataframe in csv.
  2. Currently I am running this sequentially, I want to run this in parallely with each category running on a node in EMR
  3. I tried using multiprocessing, but I am not happy with the results.

What is the best way to solve this problem in least amount of time ?

Upvotes: 0

Views: 1056

Answers (1)

RonD
RonD

Reputation: 85

Not sure why you want to convert to pandas dataframe but using spark dataframe created from your spark sql you can directly write to csv.

However, if you want csv as one file you will need to repartition to 1 which will not use all nodes. If you not bothered about how many files it generates then you could repartition dataframe to include more partitions. Each partition would then be processed by nodes and outputted until all partitions are complete.

Single file not using all nodes (note .csv will be folder contain actual csv)

df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat)) df_gl.repartition(1).write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))

Parallel processing using multiple nodes and outputting as multiple split files (note .csv will be folder contain actual csv's)

df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat)).repartition(10) df_gl.write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))

Upvotes: 1

Related Questions