Reputation: 27455
The "old" SparkContext.hadoopFile
takes a minPartitions
argument, which is a hint for the number of partitions:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
But there is no such argument on SparkContext.newAPIHadoopFile
:
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
In fact mapred.InputFormat.getSplits
takes a hint argument, but mapreduce.InputFormat.getSplits
takes a JobContext
. What is the way to influence the number of splits through the new API?
I have tried setting mapreduce.input.fileinputformat.split.maxsize
and fs.s3n.block.size
on the Configuration
object, but they had no effect. I am trying to load a 4.5 GB file from s3n
, and it gets loaded in a single task.
https://issues.apache.org/jira/browse/HADOOP-5861 is relevant, but it suggests that I should already see more than one split, since the default block size is 64 MB.
Upvotes: 7
Views: 3092
Reputation: 18750
The function newApiHadoopFile
allows you to pass a configuration object so in that you can set mapred.max.split.size
.
Even though this is in the mapred
namespace since there is seemingly no new option I would imagine the new API will respect the variable.
Upvotes: 7