Sunil Thapa
Sunil Thapa

Reputation: 11

Why does Apache Spark partitions CSV read based on the file size and how do I change the partitions?

Here is my pyspark code:

csv_file = "/FileStore/tables/mnt/training/departuredelays02.csv"
schema   = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df = (spark
     .read
     .format("csv")                    
     .option("header","true")
     .schema(schema)
     .load(csv_file)                  
        )
partitions = df.rdd.getNumPartitions()
print(partitions)

The csv file has 487178 rows.

After I print the partitions, the result I get is 3 partitions.

Please note that I have 2 workers with 4 cores. That means a total of 8 slots.

Now if I try to load the following file which is much larger with 1391578 rows:

csv_file = "/FileStore/tables/mnt/training/departuredelays.csv"

I get a partition of 8.

My question is how do I force the first CSV to partition same way as the larger file. I am aware that repartition can be used but I was curious to know whether this can be done without any shuffle? And even if we repartition it, it seems to create a job with 3 tasks instead of 8 tasks.

Here is what I get after I run the following code snippet:

df = df.repartition(8)
print(df.count())

The first stage of the first task still has 3 tasks allocated.

Output:

(3) Spark Jobs
Job 93 View(Stages: 1/1)
Stage 123: 3/3
Job 94 View(Stages: 1/1, 1 skipped)
Stage 124: 0/3 skipped
Stage 125: 8/8
Job 95 View(Stages: 1/1, 2 skipped)
Stage 126: 0/3 skipped
Stage 127: 0/8 skipped
Stage 128: 1/1

Upvotes: 1

Views: 491

Answers (1)

mani_nz
mani_nz

Reputation: 5622

You can try using coalesce which would do a sensible shuffle than repartition.

df = spark
     .read
     .format("csv")                    
     .option("header","true")
     .schema(schema)
     .load(csv_file)                  
     .coalesce(8)

Check this out for more info Spark - repartition() vs coalesce()

Upvotes: -1

Related Questions