Comrade_Question
Comrade_Question

Reputation: 335

How to add a new column to a Delta Lake table?

I'm trying to add a new column to data stored as a Delta Table in Azure Blob Storage. Most of the actions being done on the data are upserts, with many updates and few new inserts. My code to write data currently looks like this:

DeltaTable.forPath(spark, deltaPath)
      .as("dest_table")
      .merge(myDF.as("source_table"),
             "dest_table.id = source_table.id")
      .whenNotMatched()
      .insertAll()
      .whenMatched(upsertCond)
      .updateExpr(upsertStat)
      .execute()

From these docs, it looks like Delta Lake supports adding new columns on insertAll() and updateAll() calls only. However, I'm updating only when certain conditions are met and want the new column added to all the existing data (with a default value of null).

I've come up with a solution that seems extremely clunky and am wondering if there's a more elegant approach. Here's my current proposed solution:

// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")

// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")

// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)

Upvotes: 13

Views: 58221

Answers (4)

Ignacio Alorre
Ignacio Alorre

Reputation: 7615

This is the approach that worked for me using scala

Having a delta table, named original_table, which path is:

val path_to_delta = "/mnt/my/path"

This table currently has got 1M records with the following schema: pk, field1, field2, field3, field4

I want to add a new field, named new_field, to the existing schema without loosing the data already stored in original_table.

So I first created a dummy record with a simple schema containing just pk and newfield

case class new_schema(
  pk: String,
  newfield: String
)

I created a dummy record using that schema:

import spark.implicits._
val dummy_record = Seq(new new_schema("delete_later", null)).toDF 

I inserted this new record (the existing 1M records will have newfield populated as null). I also removed this dummy record from the original table:

dummy_record
  .write
  .format("delta")
  .option("mergeSchema", "true")
  .mode("append")
  .save(path_to_delta )

val original_dt : DeltaTable = DeltaTable.forPath(spark, path_to_delta )
original_dt .delete("pk = 'delete_later'")

Now the original table will have 6 fields: pk, field1, field2, field3, field4 and newfield

Finally I upsert the newfield values in the corresponding 1M records using pk as join key

val df_with_new_field = // You bring new data from somewhere...

original_dt 
  .as("original")
  .merge(
    df_with_new_field .as("new"),
    "original.pk = new.pk")
  .whenMatched
  .update( Map(
    "newfield" -> col("new.newfield")
    ))
  .execute()

https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html

Upvotes: 0

John Stud
John Stud

Reputation: 1779

New columns can also be added with SQL commands as follows:

ALTER TABLE dbName.TableName ADD COLUMNS (newColumnName dataType)

UPDATE dbName.TableName SET newColumnName = val;

Upvotes: 14

ashok gupta
ashok gupta

Reputation: 192

Alter your delta table first and then you do your merge operation:

from pyspark.sql.functions import lit

spark.read.format("delta").load('/mnt/delta/cov')\
  .withColumn("Recovered", lit(''))\
  .write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .save('/mnt/delta/cov')

Upvotes: 15

Related Questions