Ismail H
Ismail H

Reputation: 4479

Get difference between two version of delta lake table

how to find the difference between two last versions of a Delta Table ? Here is as far as I went using dataframes :

val df1 = spark.read
  .format("delta")
  .option("versionAsOf", "0001")
  .load("/path/to/my/table")

val df2 = spark.read
  .format("delta")
  .option("versionAsOf", "0002")
  .load("/path/to/my/table")

// non idiomatic way to do it ...
df1.unionAll(df2).except(df1.intersect(df2))

there is a commercial version of Delta by Databricks that provides a solution called CDF but I'm looking for an open source alternative

Upvotes: 5

Views: 3158

Answers (2)

Ismail H
Ismail H

Reputation: 4479

Using the comment of @Zinking, I managed to get a Dataframe with the difference being calculated between two versions :

1) get the latest version :

val lastVersion = DeltaTable.forPath(spark, PATH_TO_DELTA_TABLE)
    .history()
    .select(col("version"))
    .collect.toList
    .headOption
    .getOrElse(throw new Exception("Is this table empty ?"))

2) get the list of parquet files flagged as to be "added" or "removed" in a specific version 0000NUMVERSION :

val addPathList = spark
    .read
    .json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
    .where(s"add is not null")
    .select(s"add.path")
    .collect()
    .map(path => formatPath(path.toString))
    .toList
val removePathList = spark
    .read
    .json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
    .where(s"remove is not null")
    .select(s"remove.path")
    .collect()
    .map(path => formatPath(path.toString))
    .toList

3) load them in a dataframe

import org.apache.spark.sql.functions._
val addDF = spark
  .read
  .format("parquet")
  .load(addPathList: _*)
  .withColumn("add_remove", lit("add"))
val removeDF = spark
  .read
  .format("parquet")
  .load(removePathList: _*)
  .withColumn("add_remove", lit("remove"))

4) the union of both dataframes represents the "diff":

addDF.union(removeDF).show()


+----------+----------+
|updatedate|add_remove|
+----------+----------+
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
|      null|       add|
+----------+----------+
only showing top 20 rows

Upvotes: 2

fidelin
fidelin

Reputation: 320

This return a data frame with the comparative

import uk.co.gresearch.spark.diff.DatasetDiff

df1.diff(df2)

Upvotes: 2

Related Questions