Reputation: 453
I'm trying to load data from an Elasticsearch index into a dataframe in Spark. My machine has 12 CPU's and 1 core. I'm using PySpark on a Jupyter Notebook with the following Spark config:
pathElkJar = currentUserFolder+"/elasticsearch-hadoop-"+connectorVersion+"/dist/elasticsearch- spark-20_2.11-"+connectorVersion+".jar"
spark = SparkSession.builder \
.appName("elastic") \
.config("spark.jars",pathElkJar) \
.enableHiveSupport() \
.getOrCreate()
Now whether I do:
df = es_reader.load()
or:
df = es_reader.load(numPartitions=12)
I get the same output from the following prints:
print('Master: {}'.format(spark.sparkContext.master))
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Number of executors:{}'.format(spark.sparkContext._conf.get('spark.executor.instances')))
print('Partitioner: {}'.format(df.rdd.partitioner))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
Master: local[*]
Number of partitions: 1
Number of executors: None
Partitioner: None
I was expecting 12 partitions, which I can only obtain by doing a repartition()
on the dataframe. Furthermore I thought that the number of executors by default equals the number of CPU's. But even by doing the following:
spark.conf.set("spark.executor.instances", "12")
I can't manually set the number of executors. It is true I have 1 core for each of the 12 CPU's, but how should I go about it?
I modified the configuration file after creating the Spark session (without restarting this obviously leads to no changes), by specifying the number of executor as follows:
spark = SparkSession.builder \
.appName("elastic") \
.config("spark.jars",pathElkJar) \
.config("spark.executor.instances", "12") \
.enableHiveSupport() \
.getOrCreate()
I now correctly get 12 executors. Still I don't understand why it doesn't do it automatically and still the number of partitions when loading the dataframe is 1. I would expect it to be 12 as the number of executors, am I right?
Upvotes: 0
Views: 542
Reputation: 453
The problem regarding the executors and partitioning arised from the fact that i was using spark in local mode which allows for one executor maximum. Using Yarn or other resource managers such as mesos solved the problem
Upvotes: 0