Reputation: 61
I'm trying to write data to Iceberg table in Spark streaming (written in Scala).
Writer code:
val streamResult = joined.writeStream
.format("iceberg")
.partitionBy("column1", "column2")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", outputTable)
.option("checkpointLocation", "s3://checkpointLocation")
.option("fanout-enabled", "true")
.start()
.awaitTermination()
And I'm getting an error
org.apache.spark.sql.AnalysisException: bucket(20, columnX) is not currently supported
I know that on DataFrame write you can use 'bucketBy' method, but how to achieve it in writerStream ?
Versions:
Iceberg: 1.3.0
Spark: 3.3.2
Scala: 2.12
Data is read from Iceberg tables and I'm expecting that it will appear in output Iceberg table.
Edited:
Iceberg output table schema - partitioning and bucketing
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "columnX_bucket",
"transform" : "bucket[20]",
"source-id" : 242,
"field-id" : 1000
}, {
"name" : "column4_day",
"transform" : "day",
"source-id" : 10,
"field-id" : 1001
} ]
} ]
Edited2:
Check my answer from below, but it comes with another question.
Upvotes: 1
Views: 643
Reputation: 61
I've tried to set bucket by using foreachBatch and it seems that problem with bucket was solved, anyway 'write' method doesn't allow to set days(column4) partition (as I can set only columnNames as string). partitionBy is setting identity(column4).
val streamResult = joined.writeStream
.format("iceberg")
.foreachBatch((batchDF: DataFrame, batchId: Long) =>
batchDF.write.bucketBy(20, "columnX")
.format("iceberg")
.mode(SaveMode.Append)
.partitionBy("column4")
.option("txnVersion", batchId)
.option("txnAppId", "appId")
.saveAsTable(table)
I see that writeTo method allows to set Columns instead of strings, so
.partitionedBy(bucket(20, col("columnX")), days(col("column4")))
could help, but writeTo has only createOrReplace method and I cannot append the data without more changes in table itself. There is 'append()' but it doesn't allow to set partitionBy.
writeStream doesn't set any bucket write can't set days() writeTo doesn't allow to append data with defined bucket
How can I solve it to set fully partitioning - with bucket and days column ?
Logs:
Upvotes: 0