Satya
Satya

Reputation: 5907

pyspark temptable behaviour

i wanted to achieve a functionality, i will have some BASE data, and i will get incremental data.

i will combine both and perform some operation(SQL query),on success i will have BASE = BASE + incremental for next run.

on exception my baseData will be BASE_Data(incremental should not be a part here).

I have tried explaining through the code below.

i am confused with the behavior of spark temp table...

# i am reading 2 files and persisting them in MEMORY_ONLY
df = spark.read.csv('BASE_data.csv', header=True)
df.persist()
print(df.count())  #o/p:4
df1 = spark.read.csv('data.csv', header=True)
df1.persist()
print(df1.count())  #o/p:4

# i will register temp tables
df.registerTempTable('BASE_data')
spark.sql('select count(1) from BASE_data').show()  # 4 which is fine
# i will append rows from df1 to df(BASE_data) and registered as combined_data
spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('combined_data')
spark.sql('select count(1) from combined_data').show()  # 8 which is fine too

# Now i am going to unpersist df1 from memory and also change the variable
df1.unpersist()
df1=[]
spark.sql('select count(1) from combined_data').show()
# o/p=8, i am confused here, it should be 4
# when i unpersisted, spark might try to rebuild df1, by reading that file,
# so to be double sure, i reassign df1 to some empty list.

i need help here to understand this behavior and how can i achieve this functionality.

i am planning for the below simple approach, if there is any other approach

-- i donot want to keep BASE_data and Combine_DATA Status, can i achieve by having a single temp_table definition. spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('BASE_data')

-- i donot want to create something that is going to be unused after sometime, and for execution will consume memory. BASE_data on exception should be fall back to original BASE_data i:e new additional data(df1) should be removed from BASE_data on exception or by unpersisting that.

please let me know if any thing is not clear, i will try my best to explain, THANKS.

try:
    # create combine_data by union
    # do sql ops
    # BASE_DATA  = select * from cobine_data
except Exception:
    # BASE_data = BASE_Data # Basically do nothing

Also help me in "what kind of clean up i can do in the exception-Block for whatever(is ever) junk i might have created in Try-block ". I am really concerned about memory management. Thanks.

Upvotes: 0

Views: 1786

Answers (1)

Michail N
Michail N

Reputation: 3835

You union df1 with df (which was registered in temp table named 'BASE_data') and then create a table with that data named combined_data. The method registerTempTable() is an action so at that time the DAG (directed acyclic graph) is evaluated (with the current values of df and temp_table 'BASE_data') so the data are copied in a different place in memory and are now independent from df1 and df. The delete of df1 at this point has no effect in the values in combined_data because they have already been evaluated.

I do not understand why you expect on the second count to get 4 instead of 8 which is the correct. The table is created on the line where you do the union and does not change from that time, so the result will not change.

Upvotes: 1

Related Questions