Reputation: 133
In Spark 2.2.0: I'm reading in one file using
spark.csv.read("filepath").load().rdd.getNumPartitions
I get 77 partitions for a 350 MB file in one system, and 88 partitions in another. I also get 226 partitions for a 28 GB file, which is roughly 28*1024 MB/128 MB . The question is, how does Spark CSV Data Source determine this default number of partitions?
Upvotes: 3
Views: 7004
Reputation: 21
As per my experience, It depends on spark.default.parallelism
Scenario 1 : File Size : 75MB defaultParallelism : 8
>>> sc.defaultParallelism
8
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
8
Scenario : 2 File Size : 75MB defaultParallelism : 10
>>> sc.defaultParallelism
10
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
10
Scenario 3 File Size : 75MB defaultParallelism : 4
>>> sc.defaultParallelism
4
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
4
Scenario 4 : File Size : 75MB defaultParallelism : 100
>>> sc.defaultParallelism
100
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
18
In scenario 4 , it divided data into possible number of partitions i.e 18
Based on it I am infering, initial number is dependent on value of spark.default.parallelism.
And if spark.default.parallelism is set to higher number the it just creates possible number of partitions based on hashing.
Upvotes: 0
Reputation: 21
When reading csv files (single large file or multiple smaller files, compressed or not), I find that spark.sql.files.maxPartitionBytes
has a big impact on the number of resulting partitions. Tweaking this value (default of 128MB, see https://spark.apache.org/docs/latest/sql-performance-tuning.html) was key for me.
Upvotes: 1
Reputation: 19
No of partitions when reading from any file follows below formula.
step1: find file size/folder size from specified path which i was tested on local.You can find based on your requirements(either s3/hdfs).
import os
def find_folder_size(path):
total = 0
for entry in os.scandir(path):
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += find_folder_size(entry.path)
return total
Step2 : Apply formula
target_partition_size = 200 #100 or 200 depends on your target partition
total_size = find_folder_size(paths)
print('Total size: {}'.format(total_size))
print(int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size))))
num_partitions = int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size)))
PARTITION_COLUMN_NAME = ['a','c']
df = df.repartition(num_partitions, PARTITION_COLUMN_NAME)
or
df = df.repartition(num_partitions)
we can apply for either large data/small data to get number of partition.
Upvotes: 0
Reputation: 7540
Number of partitions is influenced by multiple factors - typically
spark.default.parallelism
spark.default.parallelism
Number of partitions when reading from text file (and CSV as well) should be determined as math.min(defaultParallelism, 2)
based on CSVDataSource
Upvotes: 1