Reputation: 347
I have approximately 200 files in a single directory on a Linux machine named part-0001
, part-0002
, and so on. Each has approximately one million rows with the same columns (call them 'a', 'b', and so on). Let the pair 'a','b' be the key for each row (with many duplicates).
At the same time, I have set up a Spark 2.2.0 cluster with a master and two slaves with a total of 42 cores available. The address is spark://XXX.YYY.com:7077
.
I then use PySpark to connect to the cluster and compute the counts across the 200 files for each unique pair as follows.
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext("spark://XXX.YYY.com:7077")
sqlContext = SQLContext(sc)
data_path = "/location/to/my/data/part-*"
sparkdf = sqlContext.read.csv(path=data_path, header=True)
dfgrouped = sparkdf.groupBy(['a','b'])
counts_by_group = dfgrouped.count()
This works in that I can see Spark progressing through a series of messages and it does indeed return results that look plausible.
Problem: While this calculation is being performed top does not show any evidence that the slave cores are doing anything. There doesn't appear to be any parallelization. Each slave has a single related Java process that was there before the job (plus processes from other users and background system processes). So it appears that the master is doing all the work. Given that there are 200 odd files, I had expected to see 21 processes running on each slave machine until things wound down (this is what I see when I explicitly invoke parallelize
as follows count = sc.parallelize(c=range(1, niters + 1), numSlices=ncores).map(f).reduce(add)
in a separate implementation).
Questions: How do I ensure that Spark is actually parallelizing the count? I would like each core to grab one or more files, perform the count for the pairs it sees in the file, and then have the individual results reduced into a single DataFrame
. Shouldn't I see this in top? Do I need to explicitly invoke parallelization?
(FWIW, I have seen example using partitioning, but my understanding is that this is used to distribute processing on chunks of a single file. My case is that I have many files.)
Thanks in advance.
Upvotes: 3
Views: 2460
Reputation: 330063
TL;DR There is probably nothing wrong with your deployment.
I had expected to see 21 processes running
Unless you specifically configured Spark to use a single core per executor JVM, there is no reason for this to happen. Unlike RDD
example you've mentioned in the question, DataFrame
API doesn't use Python workers at all, with exception to Python UserDefinedFunctions
.
At the same time, JVM executors use threading instead of full-fledged system processes (PySpark uses the later one to avoid GIL). Furthermore default spark.executor.cores
in standalone mode is equal to the number of the available cores on the worker. So without additional configuration you should see two executor JVMs, each using 21 data processing threads.
Overall you should check Spark UI, if you see tasks assigned to the executors, all should be fine.
Upvotes: 2