Daniel Darabos
Daniel Darabos

Reputation: 27455

How to set the number of partitions for newAPIHadoopFile?

The "old" SparkContext.hadoopFile takes a minPartitions argument, which is a hint for the number of partitions:

def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]

But there is no such argument on SparkContext.newAPIHadoopFile:

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)]

In fact mapred.InputFormat.getSplits takes a hint argument, but mapreduce.InputFormat.getSplits takes a JobContext. What is the way to influence the number of splits through the new API?

I have tried setting mapreduce.input.fileinputformat.split.maxsize and fs.s3n.block.size on the Configuration object, but they had no effect. I am trying to load a 4.5 GB file from s3n, and it gets loaded in a single task.

https://issues.apache.org/jira/browse/HADOOP-5861 is relevant, but it suggests that I should already see more than one split, since the default block size is 64 MB.

Upvotes: 7

Views: 3092

Answers (1)

aaronman
aaronman

Reputation: 18750

The function newApiHadoopFile allows you to pass a configuration object so in that you can set mapred.max.split.size.

Even though this is in the mapred namespace since there is seemingly no new option I would imagine the new API will respect the variable.

Upvotes: 7

Related Questions