Reputation: 764
I'm trying to run pyspark on amazon EMR instances to read data from dynamodb and would like to know how to set the number of splits and workers in my code?
I followed the instructions from the following two documentations to come with the code below which currently connects to a dynamoDB and reads the data. connecting to dynamoDB from pyspark and Pyspark documentation
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
conf = {"dynamodb.servicename": "dynamodb", "dynamodb.input.tableName":
"Table1", "dynamodb.endpoint": "https://dynamodb.us-east-
1.amazonaws.com", "dynamodb.regionid":"us-east-1",
"mapred.input.format.class":
"org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
"mapred.output.format.class":
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"
orders = sc.hadoopRDD(inputFormatClass="org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
keyClass="org.apache.hadoop.io.Text",
valueClass="org.apache.hadoop.dynamodb.DynamoDBItemWritable", conf=conf)
I was trying to change the values of instances and parallelism of SparkConf class but not sure how that will affect the SparkContext variable
SparkConf().set('spark.executor.instances','4')
SparkConf().set('spark.default.parallelism', '128')
to set the splits and reducers, but it doesn't seem to change it.
Upvotes: 0
Views: 1615
Reputation: 31
I was trying to change the values of instances and parallelism of SparkConf class but not sure how that will affect the SparkContext variable
Unless SparkConf
object has been passed during initalization of SparkContext
conf = SparkConf() \
.set('spark.executor.instances','4') \
.set('spark.default.parallelism', '128')
sc = SparkContext(conf=conf)
they won't have configuration. If existing context is fetched with getOrCreate
:
sc = SparkContext()
...
sc = SparkContext.getOrCreate(conf=conf)
only some (not default.parallelism
) will be applied.
would like to know how to set the number of splits
For RDD inputs use Hadoop configuration (mapred.min.split.size
):, Number of Partitions of Spark Dataframe and Why does partition parameter of SparkContext.textFile not take effect?
Upvotes: 3
Reputation: 1676
The way I usually change the SparkConf is this:
from pyspark import SparkContext
from pyspark import SparkConf
sconf = SparkConf()
sconf.set("spark.default.parallelism", 200)
sc = SparkContext(conf = sconf)
Upvotes: 0