Explorer
Explorer

Reputation: 1647

caching and reusing pyspark dataframe in loop

I have 2 script and a use case where I need to create a dataframe in one script and use it in another in loop. something like below:

script 1 :


def generate_data(spark, logger, conf):
    processed_data_final = None
    path_1 = conf["raw_data_path_1"]
    path_2 = conf["raw_data_path_2"]
    df_path1 = spark.read.parquet(path_1)
    df_path1.cache()
    df_path1.take(1) //calling action item as spark does lazy evaluation
    df_path2 = spark.read.parquet(path_2)
    df_path2.cache()
    df_path2.take(1)
    for dt in date_list:
        processed_data = process_data(spark, logger, conf, dt, df_path1, df_path2)
        if processed_data is None:
            processed_data_final = processed_data
        else:
            processed_data_final = processed_data_final.union(processed_data)

    return processed_data_final


if __name__ == "__main__":
    # generate global variables: spark, logger
    if 5 == len(sys.argv):
        env = sys.argv[1]
        job_id = sys.argv[2]

    else:
        print("parameter {env} or {job_id}")
        exit(1)

    app_name = "past_viewership_" + job_id
    spark = SparkSession \
        .builder \
        .appName(app_name) \
        .config("spark.storage.memoryFraction", 0) \
        .config("spark.driver.maxResultSize", "-1") \
        .getOrCreate()
    sc = spark.sparkContext

generate_data(spark, logger, conf)

In script 2 I reuse the dateframe from script1 in script2 like:


def process_data(spark, conf, df_path1, df_path2):
    path3= conf['path3']
    df3 = spark.read.parquet(path3)
    res_df = df3.join(df_path1, ["id"],"inner").join(df_path2,["id"], "inner")
    return res_df

This is rough code explaining the flow, in this flow I see in the logs that it is loading df_path1 and df_path2 again in the loop. I was expecting it to use the cached dataframe. How can can I avoid reading the df_path1 and df_path2 again in the loop?

Upvotes: 0

Views: 503

Answers (1)

Desmond Cheong
Desmond Cheong

Reputation: 836

Calling dataframe.take(1) does not materialize the entire dataframe. Spark's Catalyst optimizer will modify the physical plan to only read the first partition of the dataframe since only the first record is needed. Hence, only the first partition is cached until the rest of the records are read.

Upvotes: 1

Related Questions