Reputation: 1719
Deployment info: "pyspark --master yarn-client --num-executors 16 --driver-memory 16g --executor-memory 2g "
I am turning a 100,000 line text file (in hdfs dfs
format) into a RDD object with corpus = sc.textFile("my_file_name")
. When I execute corpus.count()
I do get 100000
. I realize that all these steps are performed on the master node.
Now, my question is when I perform some action like new_corpus=corpus.map(some_function)
, will the job be automatically distributed by pyspark among all available slaves (16 in my case)? Or do I have to specify something?
Notes:
new_corpus.count()
, what prints out is [Stage some_number:> (0+2)/2]
, not [Stage some_number:> (0+16)/16]
corpus = sc.textFile("my_file_name",16)
is the solution for me because the function I want to apply works at the line level and therefore should be applied 100,000 times (the goal of parallelization is to speed up this process, like having each slave taking 100000/16
lines). It should not be applied 16 times on 16 subsets of the original text file.Upvotes: 3
Views: 3726
Reputation: 2628
First of, I saw yarn-client
and a chill ran down my spine.
Is there a reason why you want the node where you submit your job to be running the driver? Why not let Yarn do its thing?
But about your question:
I realize that all these steps are performed on the master node.
No they are not. You might be mislead by the fact you are running your driver on the node you are connected to (see my spine-chill ;) ).
You tell yarn to start up 16 executors for you, and Yarn will do so. It will try to take your rack and data locality into account to the best of its ability while doing so. These will be run in parallel.
Yarn is a resource manager, it manages the resources so you don't have to. All you have to specify with Spark is the number of executors you want and the memory yarn has to assign to the executors and driver.
Update: I have added this image to clarify how spark-submit (in clustered mode) works
Upvotes: 1
Reputation: 2392
Your observations are not really correct. Stages are not "executors". In Spark we have jobs, tasks and then stages. The job is kicked off by the master driver and then task are assigned to different worker nodes where stage is a collection of task which has the same shuffling dependencies. In your case shuffling happens only once.
To check if executors are really 16, you have to look into the resource manager. Usually it is at port 4040 since you are using yarn.
Also if you use rdd.map(), then it should parallelize according to your defined partitions and not the executors which you set in sc.textFile("my_file_name", numPartitions).
Here is an overview again: https://spark.apache.org/docs/1.6.0/cluster-overview.html
Upvotes: 3