Reputation: 2085
If I have a huge CSV file of 50GB on a single HDFS node and I am trying to read the file using spark.read as below:
file_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/hdfspath/customer.csv')
and I am submitting the spark job using the below spark-submit:
spark-submit --master yarn --deploy-mode cluster --num-executors 4 --executor-memory 3G --executor-cores 5 --driver-memory 1G load_csv.py
I understand that until there is an action, spark will not load any data into memory. But what happens when an action is triggered and the first thing to be done is reading the file into memory to kick start the transformations. How does spark read the 50GB file in small parts based on the cores and executors I mentioned ?
For example: I mentioned 4 executors and 3GB memory for each executor. While reading, will spark convert the main customer.csv file into chunks of 3GB for each executor and load the file which is:
For the first 12gb:
Executor 1: 3GB
Executor 2: 3GB
Executor 3: 3GB
Executor 4: 3GB
and so on until the whole file is completed processing ?
Or will it split the file based on HDFS block size and read block by block for ex: 128MB and it tries to fit in as many blocks as it can in each executor of 3GB ?
If a file is entirely present on a single cluster (which in my case it is), how will spark process a file ?
I understand its a bit broader and cumbersome to explain, but any help would be greatly appreciated.
Upvotes: 1
Views: 2570
Reputation: 2072
IIUC,
These are the well known general practices to tune in spark for processing huge datasets(50 GB is not a huge dataset either
)
Ans: Yes,
Spark will create partition based on 1 partition for 1 HDFS block(128 MB ideally) for splitable fileformats
and not as per executor memory in this case.
2. StorageLevels in Memory & Disk`
On cache (which is the same as persist(StorageLevel.MEMORY_ONLY)
it will store all partitions in memory - if it doesn't fit in memory you will get an OOM. If you call persist(StorageLevel.MEMORY_AND_DISK)
it will store as much as it can in memory and the rest will be put on disk. If data doesn't fit on disk either the OS will usually kill your workers.
Note that Spark has its own little memory management system. Some of the memory that you assign to your Spark job is used to hold the data being worked on and some of the memory is used for storage if you call cache or persist.
from pyspark.storagelevel import StorageLevel
file_df = spark.read.format('csv').option('header', 'true')
.option('inferSchema', 'true').load('/hdfspath/customer.csv')
import org.apache.spark.storage.StorageLevel
file_df = file_df.persist(StorageLevel.MEMORY_AND_DISK)
//val df2 = df.persist(StorageLevel.DISK_ONLY)
Storage Level Space used CPU time In memory On-disk Serialized Recompute some partitions
----------------------------------------------------------------------------------------------------
MEMORY_ONLY High Low Y N N Y
MEMORY_ONLY_SER Low High Y N Y Y
MEMORY_AND_DISK High Medium Some Some Some N
MEMORY_AND_DISK_SER Low High Some Some Y N
DISK_ONLY Low High N Y Y N
3. try this following options for memory options settings.
spark-submit --master yarn --deploy-mode cluster --num-executors ex4 --executor-memory 3G --executor-cores 5 --driver-memory 3G load_csv.py
Let's say you have 10 nodes cluster with following configurations,
**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node
3.1 First Approach: Tiny executors [One Executor per core]:
- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB
With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD!
3.2 Second Approach: Fat executors (One Executor per node):
- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB
With all 16 cores per executor, apart from ApplicationManager and daemon processes are not counted for, HDFS throughput will hurt and it’ll result in excessive garbage results. Also,NOT GOOD!
3.3 Third Approach: Balance between Fat (vs) Tiny
Based on the recommendations mentioned above,
**1. Cores**
Let’s assign 5 core per executors => `--executor-cores = 5 (for good HDFS throughput)`
Leave 1 core per node for Hadoop/Yarn daemons => `Num cores available per node = 16-1 = 15`
So, Total available of cores in cluster = 15 x 10 = 150
**2. Executors**
Number of available executors = `(total cores/num-cores-per-executor) = 150/5 = 30`
Leaving 1 executor for ApplicationManager => --num-executors = 29
Number of executors per node = 30/10 = 3
Memory per executor = 64GB/3 = 21GB
Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB
So, recommended config is: 29 executors
, 18GB memory each
and `5 cores each for the above 10 node cluster
--num-executors
, --executor-cores
and --executor-memory
these three params play a very important role in spark performance as they control the amount of CPU & memory your spark application gets. This makes it very crucial for users to understand the right way to configure them.
Upvotes: 5