luc41x
luc41x

Reputation: 11

Spark & Iceberg - How to Avoid Shuffle During GroupBy with Partitioned Iceberg Tables

I'm trying out Apache Iceberg on Glue/S3 in conjunction with Apache Spark, using Spark version 3.5. As a small POC, I created an Iceberg table in order to store some events, which have a UID and a date field, among others, in parquet format:

val partitionSpec = PartitionSpec.builderFor(schema)
    .identity("date")
    .bucket("user_uid", 2048)
    .build()

val tableProperties: Map[String, String] = Map(
    "format-version" -> "2",
    "history.expire.max-snapshot-age-ms" -> (7.days).toMillis.toString,
  )

val table = glueCatalog.createTable(
      TableIdentifier.of(Namespace.of("db"), "events"),
      eventSchemaDefinedWithIcebergDsl,
      partitionSpec,
      tableProperties.asJava
    )
table.replaceSortOrder().asc("user_uid").commit()

The idea here is to store all events partitioned by date and subpartitioned (hash) by UID. Assume this table is populated correctly with some event data. My goal would now be to aggregate these events using Spark and dumping them into a second Iceberg table. This second aggregate table has the same properties as the events table, in particular the partitioning scheme definition is exactly the same as above.

I then try to aggregate the events using this simple Spark SQL query.

// Event is a case class defined by us that maps to Iceberg input state
// analogously, State is a case class that maps to Iceberg output state
spark.read.format("iceberg")
   .load("events").as[Event]  
   .where(col("date").between(startDate, endDate)) // this prunes the date query correclty!
   .groupBy(col("user_uid"))
   .as[String, Event] 
   .mapValues(toInitialState(_)) // toInitialState maps from Event -> State
   .reduceGroups((s1, s2) => updateState(s1, s2)) // updateState merges two State objects
   .map{case (k,s) => s}
   .writeTo("states") 
   .append() 

Our hope here was that Spark would recognize that the groupBy does not induce a shuffle, since input data is already partitioned by user_uid in Iceberg, provided that Spark picks up this partitioning. However, this was not the case, and Spark issued a reshuffle both for the groupBy and the final write into the result table.

My question: Is fusing stages based on partitioned input data like in the above scenario something Spark and Iceberg are expected to be capable of? What about Spark choosing an appropriate partitioner based on hidden partitioning? If so, we would like to find out what should improve in our setup.

We tried to nudge Spark into the right direction by playing around with various parameters, e.g. 
user_uid partition size, max input file size, trying out variations of the above query or by instructing/forcing Spark to use the uid partitioning scheme:

spark.read.format("iceberg")
   .load("events").as[Event] 
   .where(col("date").between(startDate, endDate))
   .repartition(2048, col("user_uid"))
   .sortWithinPartitions()

or via repartition(bucket(2048, col("user_uid"))), which did not work with the error org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot generate code for expression: bucket(2048, input[4, string, false]).

Our input data for this task was about 35GB of data per day, which would result in 35 MB of data per input partition for 2048 partitions, which also seems reasonable. No matter what we do, Spark does not assign a correct partitioner by user_uid when reading the data, as our experiments confirmed multiple times.

Upvotes: 1

Views: 620

Answers (1)

Atif
Atif

Reputation: 2210

Partitioning in Iceberg is a logical concept, so Iceberg partitioning has nothing to do with Spark shuffle operation. Iceberg tracks the partitioning using manifest list which is a metadata file which helps in partition pruning so that only the files relevant to the filter criteria based on partition column are picked. Since group by is a wide transformation Spark will trigger shuffle operation whose impact you can try to minimize by using repartition to make sure that data is evenly distributed.

Upvotes: 0

Related Questions