Reputation: 160
I'm trying to organize couple data transformations, which are performed in pyspark. I have code similar to this below.
def main():
spark_session = SparkSession\
.builder\
.appName(config.SPARK_CONFIG['AppName']) \
.getOrCreate()
data = getData(spark_session)
analytics = Analytics(data)
analytics.execute_and_save_analytics()
spark_session.stop()
def getData(spark_session):
sqlContext = pyspark.SQLContext(spark_session.sparkContext)
return sqlContext.read.option('user', user).option('password', pswd)\
.jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
+ ';database=' + database, table)
class Analytics():
def __init__(self, df):
self.df = df
def _execute(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
# df0.persist()
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df1, df2
def execute_and_save_analytics(self):
output_df1, output_df2 = self._execute()
output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file.csv', header='true')
How can I reorganize code in that way, that df0 will be evaluated only once? I tried to use persist() like in the commented line, but there wasn't any performance improvement. Any thoughts?
Another, yet similar problem, how would you organize your pipelines, if you had not one _execute(), but many similar methods _execute1(), _execute2(), etc. I suppose if I call _execute() methods separately, then PySpark will evaluate each transformations pipeline separately (?), therefore I loose performance.
edit: Given transformations (filter, groupBy, count) are only examples, I'm looking for solution working with any type of transformations or col3 definition.
edit2: It seems that calling cache() in init is the best optimization here.
Upvotes: 1
Views: 85
Reputation: 35249
As it is (with persist
commented out) df0
will be evaluated twice anyway. The structure of you code won't have any impact at all.
Splitting your code into
def _execute_1(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
return df1
def _execute_2(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df2
won't make any difference. Without going into details on cache
guarantees you could:
def __init__(self, df):
self.df = df.withColumn('col3', df.col31 + df.col32).cache()
def _execute_1(self):
return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
def _execute_2(self):
return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
def execute_and_save_analytics(self):
output_df1 = self._execute_1()
output_df2 = self._execute_2()
output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
self.df.unpersist()
but it could be easier to just:
(self.df
.withColumn('col3', df.col31 + df.col32 > 10)
.repartition("col3")
.write.partitionBy("col3")
.write.csv('/path/file.csv', header='true'))
Upvotes: 1