ScutterKey
ScutterKey

Reputation: 57

How does spark sc.textfile work in detail?

I want to find how sc.textfile works in detail.
I have found the textfile source code in SparkContext.scala but they contain so much infomation about scheduler, stage and task submitted. What I want is how sc.textfile reads files from hdfs and how sc.textfile uses wildcard to match multiple files.
Where can I find the source code?

Upvotes: 1

Views: 8653

Answers (3)

Abhash Kumar
Abhash Kumar

Reputation: 1228

textFile is a method of a org.apache.spark.SparkContext class that reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

sc.textFile(path,minpartions)

> @param path path to the text file on a supported file system  
> @param minPartitions suggested minimum number of partitions for the resulting RDD
> @return RDD of lines of the text file

It internally uses hadoopRDD (An RDD that provides core functionality for reading data stored in Hadoop )

Hadoop Rdd looks like this

HadoopRDD(
      sc, //Sparkcontext
      confBroadcast, //A general Hadoop Configuration, or a subclass of it
      Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.       inputFormatClass,
      keyClass,
      valueClass,
      minPartitions)

In the textFile method we call create a hadoopRDD with some hardcoded value:

HadoopRDD(
      sc, //Sparkcontext
      confBroadcast, //A general Hadoop Configuration, or a subclass of it
      Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. 
      classOf[TextInputFormat],
      classOf[LongWritable],
      classOf[Text],
      minPartitions)

Because of these hard coded values we are only able to read textfiles , so if we want to read any other type of file we use HadoopRdd .

Upvotes: 2

IKnow
IKnow

Reputation: 1

the compute function in core\src\main\scala\org\apache\spark\rdd\HadoopRDD.scala

here are some code in the function below

  var reader: RecordReader[K, V] = null
  val inputFormat = getInputFormat(jobConf)
  HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
   context.stageId, theSplit.index, context.attemptNumber, jobConf)
  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

  // Register an on-task-completion callback to close the input stream.
  context.addTaskCompletionListener{ context => closeIfNeeded() }
  val key: K = reader.createKey()
  val value: V = reader.createValue()

Upvotes: 0

Daniel Darabos
Daniel Darabos

Reputation: 27455

Apache Spark uses the Hadoop client library for reading the file. So you have to read the hadoop-client source code to find out more:

https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java

Upvotes: 3

Related Questions