Reputation: 695
Im faicing some schema evolution in my work process and I cant find the away to make it work.
Last week, I had 5 columns enabled in the ERP system and business requried to add the 6th column in the table.
My Deltatable
was created when I had only 5 column, so Im faicing the problem in Merge into
function that can not deal with the extra column now.
If I dont use the foreachBatch
approach which has the def
, containing Merge into
, I could solve that problem with df.writeStream.format("delta").option("overwriteSchema", True)
this line of code, which autoamtically adds the column in it. I tried to use it with foreachBatch
but it still has the problem with merging with obvious reasons.
Mycode:
def update_insert(df, epochId, cdm):
deltaTable = DeltaTable.forPath(spark,f"abfss://{container_write}@{storage_write}.dfs.core.windows.net/D365/{cdm}"+"_ao")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"),
string
) \
.whenMatchedUpdate(set =
dictionary
) \
.whenNotMatchedInsert(values =
dictionary
) \
.execute()
df.writeStream.format("delta").option("overwriteSchema", True).foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()
Preferably I want to include that extra column in my Deltatable
too. How can I achieve that?
Upvotes: 1
Views: 710
Reputation: 87259
The mergeSchema
or overwriteSchema
doesn't work with MERGE - instead, you need to set Spark conf property spark.databricks.delta.schema.autoMerge.enabled
to true
as it's described in the MERGE
documentation.
P.S. You don't need .format("delta").option("overwriteSchema", True)
with foreachBatch
...
Upvotes: 3