Arun S
Arun S

Reputation: 1463

How partitions are created in Spark

I'm looking for detailed description on how partitions are created in Spark. I assume its created based on the number of available cores in the cluster. But take for example if I have 512 MB file that needs to be processed and this file will be stored in my storage (which can be HDFS or S3 bucket) with block size of either 64 MB or 128 MB. For this case we can assume my cluster cores is 8. But when the file is getting processed by spark program how the partitions will happen on this. Hope 512 MB file will be divided into 8 different partitions and executed in 8 cores. Pls provide suggestions on this.

Upvotes: 1

Views: 210

Answers (1)

Wu.dior
Wu.dior

Reputation: 81

I find something in source code of FilePartition.scala . It seems the number of partitions is related to the configuration parameters "maxSplitBytes" and "filesOpenCostInBytes"

def getFilePartitions(
  sparkSession: SparkSession,
  partitionedFiles: Seq[PartitionedFile],
  maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
  if (currentFiles.nonEmpty) {
    // Copy to a new Array.
    val newPartition = FilePartition(partitions.size, currentFiles.toArray)
    partitions += newPartition
  }
  currentFiles.clear()
  currentSize = 0
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { file =>
  if (currentSize + file.length > maxSplitBytes) {
    closePartition()
  }
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
closePartition()
partitions.toSeq

}

Upvotes: 0

Related Questions