viji
viji

Reputation: 477

How to increase the efficiency of pyspark to pandas dataframe conversion other than PyArrow or with it

I tried PyArrow as well, In my example I got the spark datframe using spark.sql statement. After which I wanted to convert to pandas dataframe. To show the excution times I ran these below statements.

import time
startTime = time.time()
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime

this gave 1021.55

I also tried

import time
startTime = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime

This gave 1008.71

To give brief idea of the dataframe shape was (944,5). Below are the data types in spark dataframe

import pandas as pd
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter

def count_column_types(spark_df):
"""Count number of columns per type"""
return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names':lambda x: " | ".join(set(x))}).rename(columns={1:"type"})
 count_column_types(df) 

    type           count    names
 0  bigint          1   col4
 1  date            1   col1
 2  decimal(20,4)   1   col5
 3  int             1   col2
 4  string          1   col3

Please let me know if there is any way I can increase the efficiency

Upvotes: 1

Views: 978

Answers (1)

Alex Ott
Alex Ott

Reputation: 87109

The spark.sql.execution.arrow.pyspark.enabled has effect if you're using so-called Pandas UDFs, but not in your case.

Your problem is that toPandas needs to collect all data from executors to the driver node, but before that, it needs to process your SQL query, and main bottleneck could be there (you didn't show example, so it's hard to say). You can try to understand where the bottleneck is - in the SQL query execution, or it's really in toPandas. To do that, try something like:

df = spark.sql(....)
import time
startTime = time.time()
df.write.format("noop").mode("overwrite").save()
executionTime = (time.time() - startTime)
executionTime

and compare this execution time with time that you get from toPandas.

Upvotes: 1

Related Questions