L. Chu
L. Chu

Reputation: 133

How does spark.csv determine the number of partitions on read?

In Spark 2.2.0: I'm reading in one file using

spark.csv.read("filepath").load().rdd.getNumPartitions

I get 77 partitions for a 350 MB file in one system, and 88 partitions in another. I also get 226 partitions for a 28 GB file, which is roughly 28*1024 MB/128 MB . The question is, how does Spark CSV Data Source determine this default number of partitions?

Upvotes: 3

Views: 7004

Answers (4)

Sandeep Patil
Sandeep Patil

Reputation: 21

As per my experience, It depends on spark.default.parallelism

Scenario 1 : File Size : 75MB defaultParallelism : 8

>>> sc.defaultParallelism
8
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
8

Scenario : 2 File Size : 75MB defaultParallelism : 10

>>> sc.defaultParallelism
10
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
10

Scenario 3 File Size : 75MB defaultParallelism : 4

>>> sc.defaultParallelism
4
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
4

Scenario 4 : File Size : 75MB defaultParallelism : 100

>>> sc.defaultParallelism
100
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
18

In scenario 4 , it divided data into possible number of partitions i.e 18

Based on it I am infering, initial number is dependent on value of spark.default.parallelism.

And if spark.default.parallelism is set to higher number the it just creates possible number of partitions based on hashing.

Upvotes: 0

ladoe00
ladoe00

Reputation: 21

When reading csv files (single large file or multiple smaller files, compressed or not), I find that spark.sql.files.maxPartitionBytes has a big impact on the number of resulting partitions. Tweaking this value (default of 128MB, see https://spark.apache.org/docs/latest/sql-performance-tuning.html) was key for me.

Upvotes: 1

Madhu Cheemala
Madhu Cheemala

Reputation: 19

No of partitions when reading from any file follows below formula.

step1: find file size/folder size from specified path which i was tested on local.You can find based on your requirements(either s3/hdfs).

import os
def find_folder_size(path):
    total = 0
    for entry in os.scandir(path):
        if entry.is_file():
            total += entry.stat().st_size
        elif entry.is_dir():
            total += find_folder_size(entry.path)
    return total

Step2 : Apply formula

target_partition_size = 200  #100 or 200 depends on your target partition
total_size = find_folder_size(paths)
print('Total size: {}'.format(total_size))
print(int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size))))
num_partitions = int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size)))
PARTITION_COLUMN_NAME = ['a','c']
df = df.repartition(num_partitions, PARTITION_COLUMN_NAME)
or 
df = df.repartition(num_partitions)

we can apply for either large data/small data to get number of partition.

Upvotes: 0

JiriS
JiriS

Reputation: 7540

Number of partitions is influenced by multiple factors - typically

  • spark.default.parallelism
  • number of files that you're reading (if reading files from directory)
  • cluster manager/number of cores (see spark configuration) which influences spark.default.parallelism

Number of partitions when reading from text file (and CSV as well) should be determined as math.min(defaultParallelism, 2) based on CSVDataSource

Upvotes: 1

Related Questions