domisj
domisj

Reputation: 31

How to add partitioning to existing Iceberg table

How to add partitioning to existing Iceberg table which is not partitioned? Table is loaded with data already.

Table was created:

import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.PartitionSpec
import org.apache.spark.sql.SaveMode._

val df1 = spark
  .range(1000)
  .toDF
  .withColumn("level",lit("something"))

val catalog = new HiveCatalog(spark.sessionState.newHadoopConf())

val icebergSchema = SparkSchemaUtil.convert(df1.schema)

val icebergTableName = TableIdentifier.of("default", "icebergTab")

val icebergTable = catalog
  .createTable(icebergTableName, icebergSchema, PartitionSpec.unpartitioned)

Any suggestions?

Upvotes: 3

Views: 6209

Answers (2)

blackbishop
blackbishop

Reputation: 32660

For Spark 3.x, you can use ALTER TABLE SQL extensions to add partition field into existing table:

Iceberg supports adding new partition fields to a spec using ADD PARTITION FIELD:

spark.sql("ALTER TABLE default.icebergTab ADD PARTITION FIELD level")

Adding a partition field is a metadata operation and does not change any of the existing table data. New data will be written with the new partitioning, but existing data will remain in the old partition layout. Old data files will have null values for the new partition fields in metadata tables.

Upvotes: 2

Ryan Blue
Ryan Blue

Reputation: 51

Right now, the way to add partitioning is to update the partition spec manually.

val table = catalog.loadTable(tableName)
val ops = table.asInstanceOf[BaseTable].operations
val spec = PartitionSpec.builderFor(table.schema).identity("level").build

val base = ops.current
val newMeta = base.updatePartitionSpec(spec)
ops.commit(base, newMeta)

There is a pull request to add an operation to make changes, like addField("level"), but that isn't quite finished yet. I think it will be in the 0.11.0 release.

Keep in mind:

  • After you change the partition spec, the existing data files will have null values in metadata tables for the partition fields. That doesn't mean that the values would have been null if the data were written with the new spec, just that the metadata doesn't have the values for existing data files.
  • Dynamic partition replacement will have a different behavior in the new spec because the granularity of a partition is different. Without a spec, INSERT OVERWRITE will replace the whole table. With a spec, just the partitions with new rows will be replaced. To avoid this, we recommend using the DataFrameWriterV2 interface in Spark, where you can be more explicit about what data values are overwritten.

Upvotes: 5

Related Questions