Reputation: 5907
i wanted to achieve a functionality, i will have some BASE data, and i will get incremental data.
i will combine both and perform some operation(SQL query),on success i will have BASE = BASE + incremental for next run.
on exception my baseData will be BASE_Data(incremental should not be a part here).
I have tried explaining through the code below.
i am confused with the behavior of spark temp table...
# i am reading 2 files and persisting them in MEMORY_ONLY
df = spark.read.csv('BASE_data.csv', header=True)
df.persist()
print(df.count()) #o/p:4
df1 = spark.read.csv('data.csv', header=True)
df1.persist()
print(df1.count()) #o/p:4
# i will register temp tables
df.registerTempTable('BASE_data')
spark.sql('select count(1) from BASE_data').show() # 4 which is fine
# i will append rows from df1 to df(BASE_data) and registered as combined_data
spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('combined_data')
spark.sql('select count(1) from combined_data').show() # 8 which is fine too
# Now i am going to unpersist df1 from memory and also change the variable
df1.unpersist()
df1=[]
spark.sql('select count(1) from combined_data').show()
# o/p=8, i am confused here, it should be 4
# when i unpersisted, spark might try to rebuild df1, by reading that file,
# so to be double sure, i reassign df1 to some empty list.
i need help here to understand this behavior and how can i achieve this functionality.
i am planning for the below simple approach, if there is any other approach
-- i donot want to keep BASE_data and Combine_DATA Status, can i achieve by having a single temp_table definition. spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('BASE_data')
-- i donot want to create something that is going to be unused after sometime, and for execution will consume memory. BASE_data on exception should be fall back to original BASE_data i:e new additional data(df1) should be removed from BASE_data on exception or by unpersisting that.
please let me know if any thing is not clear, i will try my best to explain, THANKS.
try:
# create combine_data by union
# do sql ops
# BASE_DATA = select * from cobine_data
except Exception:
# BASE_data = BASE_Data # Basically do nothing
Also help me in "what kind of clean up i can do in the exception-Block for whatever(is ever) junk i might have created in Try-block ". I am really concerned about memory management. Thanks.
Upvotes: 0
Views: 1786
Reputation: 3835
You union df1 with df (which was registered in temp table named 'BASE_data') and then create a table with that data named combined_data. The method registerTempTable() is an action so at that time the DAG (directed acyclic graph) is evaluated (with the current values of df and temp_table 'BASE_data') so the data are copied in a different place in memory and are now independent from df1 and df. The delete of df1 at this point has no effect in the values in combined_data because they have already been evaluated.
I do not understand why you expect on the second count to get 4 instead of 8 which is the correct. The table is created on the line where you do the union and does not change from that time, so the result will not change.
Upvotes: 1