Metadata
Metadata

Reputation: 2085

How will spark load a huge csv file if the entire file is present on a single node?

If I have a huge CSV file of 50GB on a single HDFS node and I am trying to read the file using spark.read as below:

file_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/hdfspath/customer.csv')

and I am submitting the spark job using the below spark-submit:

spark-submit --master yarn --deploy-mode cluster --num-executors 4 --executor-memory 3G --executor-cores 5 --driver-memory 1G load_csv.py

I understand that until there is an action, spark will not load any data into memory. But what happens when an action is triggered and the first thing to be done is reading the file into memory to kick start the transformations. How does spark read the 50GB file in small parts based on the cores and executors I mentioned ?

For example: I mentioned 4 executors and 3GB memory for each executor. While reading, will spark convert the main customer.csv file into chunks of 3GB for each executor and load the file which is:

For the first 12gb:

Executor 1: 3GB
Executor 2: 3GB
Executor 3: 3GB
Executor 4: 3GB

and so on until the whole file is completed processing ?

Or will it split the file based on HDFS block size and read block by block for ex: 128MB and it tries to fit in as many blocks as it can in each executor of 3GB ?

If a file is entirely present on a single cluster (which in my case it is), how will spark process a file ?

I understand its a bit broader and cumbersome to explain, but any help would be greatly appreciated.

Upvotes: 1

Views: 2570

Answers (1)

sathya
sathya

Reputation: 2072

IIUC,

These are the well known general practices to tune in spark for processing huge datasets(50 GB is not a huge dataset either)

  1. will it split the file based on HDFS block size and read block by block for ex: 128MB and it tries to fit in as many blocks as it can in each executor of 3GB ?

Ans: Yes, Spark will create partition based on 1 partition for 1 HDFS block(128 MB ideally) for splitable fileformats and not as per executor memory in this case.

2. StorageLevels in Memory & Disk`

On cache (which is the same as persist(StorageLevel.MEMORY_ONLY) it will store all partitions in memory - if it doesn't fit in memory you will get an OOM. If you call persist(StorageLevel.MEMORY_AND_DISK) it will store as much as it can in memory and the rest will be put on disk. If data doesn't fit on disk either the OS will usually kill your workers.

Note that Spark has its own little memory management system. Some of the memory that you assign to your Spark job is used to hold the data being worked on and some of the memory is used for storage if you call cache or persist.

from pyspark.storagelevel import StorageLevel

file_df = spark.read.format('csv').option('header', 'true')
.option('inferSchema', 'true').load('/hdfspath/customer.csv')
import org.apache.spark.storage.StorageLevel

file_df = file_df.persist(StorageLevel.MEMORY_AND_DISK)
//val df2 = df.persist(StorageLevel.DISK_ONLY)

Storage Level    Space used  CPU time  In memory  On-disk  Serialized   Recompute some partitions
----------------------------------------------------------------------------------------------------
MEMORY_ONLY          High        Low       Y          N        N         Y    
MEMORY_ONLY_SER      Low         High      Y          N        Y         Y
MEMORY_AND_DISK      High        Medium    Some       Some     Some      N
MEMORY_AND_DISK_SER  Low         High      Some       Some     Y         N
DISK_ONLY            Low         High      N          Y        Y         N

3. try this following options for memory options settings.

spark-submit --master yarn --deploy-mode cluster --num-executors ex4 --executor-memory 3G --executor-cores 5 --driver-memory 3G load_csv.py

Let's say you have 10 nodes cluster with following configurations,

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

3.1 First Approach: Tiny executors [One Executor per core]:

- `--num-executors` = `In this approach, we'll assign one executor per core`
                    = `total-cores-in-cluster`
                   = `num-cores-per-node * total-nodes-in-cluster` 
                   = 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
                     = `mem-per-node/num-executors-per-node`
                     = 64GB/16 = 4GB

With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD!

3.2 Second Approach: Fat executors (One Executor per node):

- `--num-executors` = `In this approach, we'll assign one executor per node`
                    = `total-nodes-in-cluster`
                   = 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
                     = `total-cores-in-a-node`
                     = 16
- `--executor-memory` = `amount of memory per executor`
                     = `mem-per-node/num-executors-per-node`
                     = 64GB/1 = 64GB

With all 16 cores per executor, apart from ApplicationManager and daemon processes are not counted for, HDFS throughput will hurt and it’ll result in excessive garbage results. Also,NOT GOOD!

3.3 Third Approach: Balance between Fat (vs) Tiny

Based on the recommendations mentioned above,

**1. Cores**
Let’s assign 5 core per executors => `--executor-cores = 5 (for good HDFS throughput)`
Leave 1 core per node for Hadoop/Yarn daemons => `Num cores available per node = 16-1 = 15`
So, Total available of cores in cluster = 15 x 10 = 150

**2. Executors**
Number of available executors = `(total cores/num-cores-per-executor) = 150/5 = 30`
Leaving 1 executor for ApplicationManager => --num-executors = 29
Number of executors per node = 30/10 = 3
Memory per executor = 64GB/3 = 21GB
Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB

So, recommended config is: 29 executors, 18GB memory each and `5 cores each for the above 10 node cluster

--num-executors, --executor-cores and --executor-memory these three params play a very important role in spark performance as they control the amount of CPU & memory your spark application gets. This makes it very crucial for users to understand the right way to configure them.

Upvotes: 5

Related Questions