Netrunner
Netrunner

Reputation: 61

Scala Spark Iceberg writeStream. How to set bucket?

I'm trying to write data to Iceberg table in Spark streaming (written in Scala).

Writer code:

    val streamResult = joined.writeStream
      .format("iceberg")
      .partitionBy("column1", "column2")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
      .option("path", outputTable)
      .option("checkpointLocation", "s3://checkpointLocation")
      .option("fanout-enabled", "true")
      .start()
      .awaitTermination()

And I'm getting an error

org.apache.spark.sql.AnalysisException: bucket(20, columnX) is not currently supported

I know that on DataFrame write you can use 'bucketBy' method, but how to achieve it in writerStream ?

Versions:
Iceberg: 1.3.0
Spark: 3.3.2
Scala: 2.12

Data is read from Iceberg tables and I'm expecting that it will appear in output Iceberg table.

Edited:
Iceberg output table schema - partitioning and bucketing

    "partition-specs" : [ {
      "spec-id" : 0,
      "fields" : [ {
        "name" : "columnX_bucket",
        "transform" : "bucket[20]",
        "source-id" : 242,
        "field-id" : 1000
      }, {
        "name" : "column4_day",
        "transform" : "day",
        "source-id" : 10,
        "field-id" : 1001
      } ]
    } ]

Edited2:
Check my answer from below, but it comes with another question.

Upvotes: 1

Views: 643

Answers (1)

Netrunner
Netrunner

Reputation: 61

I've tried to set bucket by using foreachBatch and it seems that problem with bucket was solved, anyway 'write' method doesn't allow to set days(column4) partition (as I can set only columnNames as string). partitionBy is setting identity(column4).

val streamResult = joined.writeStream
  .format("iceberg")
  .foreachBatch((batchDF: DataFrame, batchId: Long) =>
            batchDF.write.bucketBy(20, "columnX")
              .format("iceberg")
              .mode(SaveMode.Append)
              .partitionBy("column4")
              .option("txnVersion", batchId)
              .option("txnAppId", "appId")
              .saveAsTable(table)

I see that writeTo method allows to set Columns instead of strings, so

.partitionedBy(bucket(20, col("columnX")), days(col("column4")))

could help, but writeTo has only createOrReplace method and I cannot append the data without more changes in table itself. There is 'append()' but it doesn't allow to set partitionBy.

writeStream doesn't set any bucket write can't set days() writeTo doesn't allow to append data with defined bucket

How can I solve it to set fully partitioning - with bucket and days column ?

Logs:

  • provided: identity(column4), bucket(20, columnX)
  • table: bucket(20, columnX), days(column4)

Upvotes: 0

Related Questions