Megan
Megan

Reputation: 1150

PySpark + AWS EMR: df.count() taking a long time to complete

I am using the action count() to trigger my udf function to run. This works, but long after my udf function has completed running, the df.count() takes days to complete. The dataframe itself is not large, and has about 30k to 100k rows.

AWS Cluster Settings:

Spark Variables & Settings (These are the spark variables being used to run the script)

Psuedo Code

Here is the actual structure of our script. The custom pandas udf function makes a call to a PostGres database for every row.

from pyspark.sql.functions import pandas_udf, PandasUDFType

# udf_schema: A function that returns the schema for the dataframe

def main():
    # Define pandas udf for calculation
    # To perform this calculation, every row in the 
    # dataframe needs information pulled from our PostGres DB
    # which does take some time, ~2-3 hours
    @pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
    def calculate_values(local_df):
        local_df = run_calculation(local_df)
        return local_df

    # custom function that pulls data from our database and
    # creates the dataframe
    df = get_df()

    df = df\
        .groupBy('some_unique_id')\
        .apply(calculate_values)

    print(f'==> finished running calculation for {df.count()} rows!')

    return

Upvotes: 2

Views: 525

Answers (1)

Ray
Ray

Reputation: 133

I met same issues like yours,but the reason for me its due to the limition of jdbc not the cluster itself. So if you are same as me, using jdbc access to db like impala or postgre. you can try following scripts

df = spark.read.option("numPartitions",100).option("partitionColumn",$COLUMN_NAME).jdbc($THE_JDBC_SETTTING)

instead of

df = spark.read.jdbc($THE_JDBC_SETTTING)

Upvotes: 1

Related Questions