Navjot Singh
Navjot Singh

Reputation: 764

How to set number of splits and reducers in pyspark

I'm trying to run pyspark on amazon EMR instances to read data from dynamodb and would like to know how to set the number of splits and workers in my code?

I followed the instructions from the following two documentations to come with the code below which currently connects to a dynamoDB and reads the data. connecting to dynamoDB from pyspark and Pyspark documentation

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
conf = {"dynamodb.servicename": "dynamodb", "dynamodb.input.tableName":
       "Table1", "dynamodb.endpoint": "https://dynamodb.us-east-
        1.amazonaws.com", "dynamodb.regionid":"us-east-1", 
       "mapred.input.format.class": 
       "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
       "mapred.output.format.class": 
       "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"
orders = sc.hadoopRDD(inputFormatClass="org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
                     keyClass="org.apache.hadoop.io.Text",
                      valueClass="org.apache.hadoop.dynamodb.DynamoDBItemWritable", conf=conf)

I was trying to change the values of instances and parallelism of SparkConf class but not sure how that will affect the SparkContext variable

SparkConf().set('spark.executor.instances','4')
SparkConf().set('spark.default.parallelism', '128')

to set the splits and reducers, but it doesn't seem to change it.

Upvotes: 0

Views: 1615

Answers (2)

user9044803
user9044803

Reputation: 31

I was trying to change the values of instances and parallelism of SparkConf class but not sure how that will affect the SparkContext variable

Unless SparkConf object has been passed during initalization of SparkContext

conf = SparkConf()  \
    .set('spark.executor.instances','4') \
    .set('spark.default.parallelism', '128')

sc = SparkContext(conf=conf)

they won't have configuration. If existing context is fetched with getOrCreate:

sc = SparkContext()
...
sc = SparkContext.getOrCreate(conf=conf) 

only some (not default.parallelism) will be applied.

would like to know how to set the number of splits

For RDD inputs use Hadoop configuration (mapred.min.split.size):, Number of Partitions of Spark Dataframe and Why does partition parameter of SparkContext.textFile not take effect?

Upvotes: 3

user3689574
user3689574

Reputation: 1676

The way I usually change the SparkConf is this:

from pyspark import SparkContext
from pyspark import SparkConf

sconf = SparkConf()

sconf.set("spark.default.parallelism", 200)

sc = SparkContext(conf = sconf)

Upvotes: 0

Related Questions