d125q
d125q

Reputation: 1666

Aggregating large Datasets in Spark SQL

Consider the following code:

case class Person(
  personId: Long, name: String, ageGroup: String, gender: String,
  relationshipStatus: String, country: String, state: String
)

case class PerPersonPower(personId: Long, power: Double)

val people: Dataset[Person] = ...          // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ...  // Around 50 million entries.

people.join(powers, "personId")
  .groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
  .agg(
    sum("power").alias("totalPower"),
    count("*").alias("personCount")
  )

It is executed on a cluster with approximately 100 GB of RAM. However, the cluster runs out of memory. I am not sure what to do. In reality, people is partitioned by $"personId" and cached -- people.repartition($"personId").cache().

Any ideas how I might optimize this computation?

The cluster is a vanilla Google Dataproc cluster --- so it uses YARN in client mode --- consisting of 14 nodes with 8 GB RAM each.

Upvotes: 5

Views: 2127

Answers (1)

nefo_x
nefo_x

Reputation: 3088

From limited information available in the request, i can suggest not to use cache and create a bit more partitions, than default number (it's usually 200, but may differ from cluster to cluster) - try setting spark.shuffle.partitions within your app to 1000 or 2000 to start with. It may be done as something like spark.conf.set('spark.shuffle.partitions', 1000). Most likely your query hits SortMergeJoin and currently executor gets more data that it's heap minus YARN overhead. Please consult your SparkUI for the cluster in order to monitor and optimize your query execution. In the SQL tab you'll see pretty much detailed numbers about how much data is being processed within each and every stage, so you'll identify bottlenecks and fix them faster.

Spark query planner would first sort PerPersonPower and Person by personId in number defined within spark.shuffle.partitions, flush that to HDFS into spark.shuffle.partitions separate parquet files, then create the same number of partial aggregates and get those into your resulting dataframe.

It seems that you are joining around 18-20GB (people) of data with about 800MB (power). If power would be a bit smaller, you could try to utilize BroadcastHashJoin like people.join(broadcast(powers), "personId"), though i won't recommend broadcasting dataframes larger than 128Mb or 256Mb.

Good luck!

Upvotes: 3

Related Questions