Reputation: 1647
I have 2 script and a use case where I need to create a dataframe in one script and use it in another in loop. something like below:
script 1 :
def generate_data(spark, logger, conf):
processed_data_final = None
path_1 = conf["raw_data_path_1"]
path_2 = conf["raw_data_path_2"]
df_path1 = spark.read.parquet(path_1)
df_path1.cache()
df_path1.take(1) //calling action item as spark does lazy evaluation
df_path2 = spark.read.parquet(path_2)
df_path2.cache()
df_path2.take(1)
for dt in date_list:
processed_data = process_data(spark, logger, conf, dt, df_path1, df_path2)
if processed_data is None:
processed_data_final = processed_data
else:
processed_data_final = processed_data_final.union(processed_data)
return processed_data_final
if __name__ == "__main__":
# generate global variables: spark, logger
if 5 == len(sys.argv):
env = sys.argv[1]
job_id = sys.argv[2]
else:
print("parameter {env} or {job_id}")
exit(1)
app_name = "past_viewership_" + job_id
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.storage.memoryFraction", 0) \
.config("spark.driver.maxResultSize", "-1") \
.getOrCreate()
sc = spark.sparkContext
generate_data(spark, logger, conf)
In script 2 I reuse the dateframe from script1 in script2 like:
def process_data(spark, conf, df_path1, df_path2):
path3= conf['path3']
df3 = spark.read.parquet(path3)
res_df = df3.join(df_path1, ["id"],"inner").join(df_path2,["id"], "inner")
return res_df
This is rough code explaining the flow, in this flow I see in the logs that it is loading df_path1 and df_path2 again in the loop. I was expecting it to use the cached dataframe. How can can I avoid reading the df_path1 and df_path2 again in the loop?
Upvotes: 0
Views: 503
Reputation: 836
Calling dataframe.take(1)
does not materialize the entire dataframe. Spark's Catalyst optimizer will modify the physical plan to only read the first partition of the dataframe since only the first record is needed. Hence, only the first partition is cached until the rest of the records are read.
Upvotes: 1