Louis
Louis

Reputation: 11

How does Spark read unpartitioned Delta tables?

I observe severe underutilization of CPU in my Databricks job run metrics, on average less than 50% - indicating that I do not parallelize enough tasks in the Spark workflow.

I am especially interested in improving the job's read parallelism. For context, I read multiple tables.

Is my knowledge correct that, concerning the read stage, Spark creates the same amount of tasks as the table-to-read has partitions? Moreover, what does this look like when my table is not partitioned?

I know that by default the configuration option spark.sql.shuffle.partitions = 200. However, the Spark docs detail that this property applies to wide transformations (i.e. joins & aggregates) happening in shuffle stages which come after the initial read stage if I am not mistaken.

Configures the number of partitions to use when shuffling data for joins or aggregations.

Since reading is neither a join nor an aggregation, I wonder if this default value (i.e. 200) will be the level of Spark parallelism (i.e. 200 tasks) to fill up the 6*16 = 96 cores in my cluster to read that unpartitioned data.

On the unpartitioned table, I have messed with spark.conf.set("spark.sql.files.maxPartitionBytes", "64MB") that controls the maximum number of bytes read per partition (default: 128 MB) to increase the number of Spark partitions, which increases the number of tasks and, therefore, parallelism.

However, I have not seen a significant result in performance.

Upvotes: 1

Views: 63

Answers (1)

Ged
Ged

Reputation: 18098

Whether a (Delta) table is partitioned or not or reading a pruned partition, there is a common approach.

  1. spark.sql.shuffle.partitions has no role in reading from data at rest that is relevant for shuffling due JOIN etc.
  2. Does not matter if pyspark or Spark with Scala API.
  3. spark.default.parallelism is only relevant for RDD's so leaving out of the equation.
  4. spark.sql.files.maxPartitionBytes is relevant and also if parquest file one could argue.

Imagine a Delta table with 5 files, 1 very large = X size and 4 smaller files of size X/5, and the App has 2 vCores available. And spark.sql.files.maxPartitionBytes = X/2.

Then:

  1. Number of Tasks / parallelization will be:
    • 4 for the smaller files that are of size X/5 that fit into X/2 sizing
    • 2 for the large file of size X, that is split on X/2
  2. 2 Tasks can run concurrently to read the file.

Upvotes: 1

Related Questions