Reputation: 335
I'm trying to add a new column to data stored as a Delta Table in Azure Blob Storage. Most of the actions being done on the data are upserts, with many updates and few new inserts. My code to write data currently looks like this:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
From these docs, it looks like Delta Lake supports adding new columns on insertAll()
and updateAll()
calls only. However, I'm updating only when certain conditions are met and want the new column added to all the existing data (with a default value of null
).
I've come up with a solution that seems extremely clunky and am wondering if there's a more elegant approach. Here's my current proposed solution:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
Upvotes: 13
Views: 58221
Reputation: 7615
This is the approach that worked for me using scala
Having a delta table, named original_table
, which path is:
val path_to_delta = "/mnt/my/path"
This table currently has got 1M records with the following schema: pk
, field1
, field2
, field3
, field4
I want to add a new field, named new_field
, to the existing schema without loosing the data already stored in original_table
.
So I first created a dummy record with a simple schema containing just pk
and newfield
case class new_schema(
pk: String,
newfield: String
)
I created a dummy record using that schema:
import spark.implicits._
val dummy_record = Seq(new new_schema("delete_later", null)).toDF
I inserted this new record (the existing 1M records will have newfield
populated as null). I also removed this dummy record from the original table:
dummy_record
.write
.format("delta")
.option("mergeSchema", "true")
.mode("append")
.save(path_to_delta )
val original_dt : DeltaTable = DeltaTable.forPath(spark, path_to_delta )
original_dt .delete("pk = 'delete_later'")
Now the original table will have 6 fields: pk
, field1
, field2
, field3
, field4
and newfield
Finally I upsert the newfield
values in the corresponding 1M records using pk
as join key
val df_with_new_field = // You bring new data from somewhere...
original_dt
.as("original")
.merge(
df_with_new_field .as("new"),
"original.pk = new.pk")
.whenMatched
.update( Map(
"newfield" -> col("new.newfield")
))
.execute()
https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
Upvotes: 0
Reputation: 1779
New columns can also be added with SQL commands as follows:
ALTER TABLE dbName.TableName ADD COLUMNS (newColumnName dataType)
UPDATE dbName.TableName SET newColumnName = val;
Upvotes: 14
Reputation: 192
Alter your delta table first and then you do your merge operation:
from pyspark.sql.functions import lit
spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered", lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save('/mnt/delta/cov')
Upvotes: 15
Reputation: 7
Have you tried using the merge statement?
https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html
Upvotes: -2