Reputation: 1
Here is my configuration about spark
and iceberg
in POM (I can make sure these configurations are correct, as I can insert data into iceberg table):
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<!-- iceberg -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
<version>1.2.0</version>
</dependency>
I want to test the expireSnapshots
and deleteOrphanFiles
API. Before I actually run these API, I have already insert data into an iceberg table correctly. The following is the directory structure:
$ tree db
db
└── table_name
├── data
│ └── year=2024
│ └── month=04
│ └── day=07
│ └── dataType=PD
│ └── 00000-0-04211945-446e-4d17-83c4-7e8447f5e4e4-00001.parquet
└── metadata
├── d2d68629-eb2f-4582-b2ed-ebb17f923f72-m0.avro
├── snap-8662509695078634239-1-d2d68629-eb2f-4582-b2ed-ebb17f923f72.avro
├── v1.metadata.json
├── v2.metadata.json
└── version-hint.text
There are two metadata files. As I first create the table, then I insert data into partitions. The following is my program. Here is the way I am thinking, I first list all snapshot ids, it should have only one, as I just insert once. Then I use expireSnapshots
, it should mark all snapshots as orphan, and remove them from the metadata files. After that, I call deleteOrphanFiles
, it should actually delete all data files. If I refresh the table and get the snapshot ids. From my expectation, I should get an empty list, and all data files under data
folder should be deleted. But I still see these data files and able to query the snapshot id. Can you help expalin why? (I can make sure no other spark jobs, branches, tags are using this table. I insert data 7 hours ago)
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.duration.DurationInt
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.spark.sql.SparkSession
object ExpireSnapshotDemo {
def main(args: Array[String]): Unit = {
@transient implicit val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.driver.bindAddress", "127.0.0.1")
.appName("IcebergTableCreationExample")
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
)
.config(
"spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog"
)
.config("spark.sql.catalog.local.type", "hadoop")
.config(
"spark.sql.catalog.local.warehouse",
"/Users/Code/spark/iceberg"
)
.getOrCreate()
val catalog = new HadoopCatalog(
new Configuration(),
"/Users/Code/spark/iceberg"
)
// when use hadoop catalog, just need to specify the namespace and table name
val tableIdentifier = TableIdentifier.of("db", "table_name")
// val tableFullName = "local.db.table_name"
val table = catalog.loadTable(tableIdentifier)
println(table.snapshots().asScala.map(_.snapshotId()).toList)
val expireTimeMilliseconds = System.currentTimeMillis() - 2.hours.toMillis
println(expireTimeMilliseconds)
// remove them from metadata
SparkActions
.get(spark)
.expireSnapshots(table)
.expireOlderThan(expireTimeMilliseconds)
.execute()
table.refresh()
println(table.snapshots().asScala.map(_.snapshotId()).toList)
SparkActions
.get(spark)
.deleteOrphanFiles(table)
.olderThan(expireTimeMilliseconds)
.execute()
table.refresh()
println(table.snapshots().asScala.map(_.snapshotId()).toList)
}
}
Upvotes: 0
Views: 1270
Reputation: 1
With the suggestion from manuzhang
, I solved this problem. In Iceberg, at least one snapshot should be kept.
I try to insert 2024-04-08
data into the table, now I have two snapshots, then I expired the yesterday snapshot, the releated metadata and data files are deleted.
# before expire & delte operation
$ tree db
db
└── table_name
├── data
│ └── year=2024
│ └── month=04
│ ├── day=07
│ │ └── dataType=PD
│ │ └── 00000-0-04211945-446e-4d17-83c4-7e8447f5e4e4-00001.parquet
│ └── day=08
│ └── dataType=SFA
│ └── 00000-0-3aee6238-bd77-4e12-8112-d524613ee43b-00001.parquet
└── metadata
├── bd91dae7-82ef-4ca6-86b8-525d49ecf6c6-m0.avro
├── bd91dae7-82ef-4ca6-86b8-525d49ecf6c6-m1.avro
├── d2d68629-eb2f-4582-b2ed-ebb17f923f72-m0.avro
├── snap-8158653185919657152-1-bd91dae7-82ef-4ca6-86b8-525d49ecf6c6.avro
├── snap-8662509695078634239-1-d2d68629-eb2f-4582-b2ed-ebb17f923f72.avro
├── v1.metadata.json
├── v2.metadata.json
├── v3.metadata.json
└── version-hint.text
# after the expire and delete operation
db
└── table_name
├── data
│ └── year=2024
│ └── month=04
│ ├── day=07
│ │ └── dataType=PD
│ └── day=08
│ └── dataType=SFA
│ └── 00000-0-3aee6238-bd77-4e12-8112-d524613ee43b-00001.parquet
└── metadata
├── bd91dae7-82ef-4ca6-86b8-525d49ecf6c6-m0.avro
├── bd91dae7-82ef-4ca6-86b8-525d49ecf6c6-m1.avro
├── snap-8158653185919657152-1-bd91dae7-82ef-4ca6-86b8-525d49ecf6c6.avro
├── v1.metadata.json
├── v2.metadata.json
├── v3.metadata.json
├── v4.metadata.json
└── version-hint.text
Then I ask ChatGPT the reason that I must keep at least one snapshot. Here is the answer from ChatGPT:
In Apache Iceberg, the expireSnapshots operation is used to clean up old snapshots that are no longer needed. This is important for maintaining a manageable metadata size and for removing data files that are no longer referenced by any valid snapshot, thus freeing up storage space.The reason why expireSnapshots must keep at least one snapshot is related to the core design of Iceberg, which relies on snapshots to provide a consistent view of the table at different points in time. Each snapshot represents the state of the table at the time it was taken and includes metadata pointing to the data files that make up the table at that moment.
Here are a few reasons why keeping at least one snapshot is important:
Consistency and Recoverability: The most recent snapshot is the one that represents the current state of the table. If all snapshots were deleted, there would be no reference to the current data files, and the table would effectively lose its current state. Keeping at least one snapshot ensures that there is always a consistent and recoverable state of the table.
Incremental Processing: Some data processing jobs rely on the ability to read the differences between snapshots to process data incrementally. If the last snapshot were removed, these jobs would not have a base snapshot to compare against for identifying new changes.
Auditing and Time Travel: Iceberg supports time travel queries, which allow users to query the table as it was at a specific point in time. This is only possible if there is a snapshot to reference that historical state. Without at least one snapshot, time travel to the most recent state would not be possible.
Referential Integrity: In some cases, downstream systems or processes may hold references to a specific snapshot for consistency guarantees. Removing all snapshots could break these references and potentially lead to data integrity issues.
Therefore, expireSnapshots is designed to prevent the deletion of the current (most recent) snapshot. Users can still remove older snapshots to clean up the history and associated data files, but the current state of the table is preserved.
Upvotes: 0