Reputation: 1150
I am using the action count()
to trigger my udf function to run. This works, but long after my udf function has completed running, the df.count() takes days to complete. The dataframe itself is not large, and has about 30k to 100k rows.
AWS Cluster Settings:
Spark Variables & Settings (These are the spark variables being used to run the script)
--executor-cores 4
--conf spark.sql.execution.arrow.enabled=true
'spark.sql.inMemoryColumnarStorage.batchSize', 2000000 (set inside pyspark script)
Psuedo Code
Here is the actual structure of our script. The custom pandas udf function makes a call to a PostGres database for every row.
from pyspark.sql.functions import pandas_udf, PandasUDFType
# udf_schema: A function that returns the schema for the dataframe
def main():
# Define pandas udf for calculation
# To perform this calculation, every row in the
# dataframe needs information pulled from our PostGres DB
# which does take some time, ~2-3 hours
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def calculate_values(local_df):
local_df = run_calculation(local_df)
return local_df
# custom function that pulls data from our database and
# creates the dataframe
df = get_df()
df = df\
.groupBy('some_unique_id')\
.apply(calculate_values)
print(f'==> finished running calculation for {df.count()} rows!')
return
Upvotes: 2
Views: 525
Reputation: 133
I met same issues like yours,but the reason for me its due to the limition of jdbc not the cluster itself. So if you are same as me, using jdbc access to db like impala or postgre. you can try following scripts
df = spark.read.option("numPartitions",100).option("partitionColumn",$COLUMN_NAME).jdbc($THE_JDBC_SETTTING)
instead of
df = spark.read.jdbc($THE_JDBC_SETTTING)
Upvotes: 1