nonoDa
nonoDa

Reputation: 453

Spark load from Elasticsearch: number of executor and partitions

I'm trying to load data from an Elasticsearch index into a dataframe in Spark. My machine has 12 CPU's and 1 core. I'm using PySpark on a Jupyter Notebook with the following Spark config:

pathElkJar = currentUserFolder+"/elasticsearch-hadoop-"+connectorVersion+"/dist/elasticsearch- spark-20_2.11-"+connectorVersion+".jar"

spark = SparkSession.builder \
    .appName("elastic") \
    .config("spark.jars",pathElkJar) \
    .enableHiveSupport() \
    .getOrCreate()

Now whether I do:

df = es_reader.load()

or:

df = es_reader.load(numPartitions=12)

I get the same output from the following prints:

print('Master: {}'.format(spark.sparkContext.master))
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Number of executors:{}'.format(spark.sparkContext._conf.get('spark.executor.instances')))
print('Partitioner: {}'.format(df.rdd.partitioner))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))

Master: local[*]
Number of partitions: 1
Number of executors: None
Partitioner: None

I was expecting 12 partitions, which I can only obtain by doing a repartition() on the dataframe. Furthermore I thought that the number of executors by default equals the number of CPU's. But even by doing the following:

spark.conf.set("spark.executor.instances", "12")

I can't manually set the number of executors. It is true I have 1 core for each of the 12 CPU's, but how should I go about it?

I modified the configuration file after creating the Spark session (without restarting this obviously leads to no changes), by specifying the number of executor as follows:

spark = SparkSession.builder \
    .appName("elastic") \
    .config("spark.jars",pathElkJar) \
    .config("spark.executor.instances", "12") \
    .enableHiveSupport() \
    .getOrCreate()

I now correctly get 12 executors. Still I don't understand why it doesn't do it automatically and still the number of partitions when loading the dataframe is 1. I would expect it to be 12 as the number of executors, am I right?

Upvotes: 0

Views: 542

Answers (1)

nonoDa
nonoDa

Reputation: 453

The problem regarding the executors and partitioning arised from the fact that i was using spark in local mode which allows for one executor maximum. Using Yarn or other resource managers such as mesos solved the problem

Upvotes: 0

Related Questions