Reputation: 477
I tried PyArrow as well, In my example I got the spark datframe using spark.sql statement. After which I wanted to convert to pandas dataframe. To show the excution times I ran these below statements.
import time
startTime = time.time()
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
this gave 1021.55
I also tried
import time
startTime = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
This gave 1008.71
To give brief idea of the dataframe shape was (944,5). Below are the data types in spark dataframe
import pandas as pd
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter
def count_column_types(spark_df):
"""Count number of columns per type"""
return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names':lambda x: " | ".join(set(x))}).rename(columns={1:"type"})
count_column_types(df)
type count names
0 bigint 1 col4
1 date 1 col1
2 decimal(20,4) 1 col5
3 int 1 col2
4 string 1 col3
Please let me know if there is any way I can increase the efficiency
Upvotes: 1
Views: 978
Reputation: 87109
The spark.sql.execution.arrow.pyspark.enabled
has effect if you're using so-called Pandas UDFs, but not in your case.
Your problem is that toPandas
needs to collect all data from executors to the driver node, but before that, it needs to process your SQL query, and main bottleneck could be there (you didn't show example, so it's hard to say). You can try to understand where the bottleneck is - in the SQL query execution, or it's really in toPandas
. To do that, try something like:
df = spark.sql(....)
import time
startTime = time.time()
df.write.format("noop").mode("overwrite").save()
executionTime = (time.time() - startTime)
executionTime
and compare this execution time with time that you get from toPandas
.
Upvotes: 1