Reputation: 57
I want to find how sc.textfile
works in detail.
I have found the textfile source code in SparkContext.scala but they contain so much infomation about scheduler, stage and task submitted. What I want is how sc.textfile reads files from hdfs and how sc.textfile uses wildcard to match multiple files.
Where can I find the source code?
Upvotes: 1
Views: 8653
Reputation: 1228
textFile
is a method of a org.apache.spark.SparkContext
class that
reads a text file from HDFS, a local file system (available on all nodes), or any
Hadoop-supported file system URI, and return it as an RDD of Strings.
sc.textFile(path,minpartions)
> @param path path to the text file on a supported file system
> @param minPartitions suggested minimum number of partitions for the resulting RDD
> @return RDD of lines of the text file
It internally uses hadoopRDD (An RDD that provides core functionality for reading data stored in Hadoop )
Hadoop Rdd looks like this
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. inputFormatClass,
keyClass,
valueClass,
minPartitions)
In the textFile method we call create a hadoopRDD with some hardcoded value:
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
minPartitions)
Because of these hard coded values we are only able to read textfiles , so if we want to read any other type of file we use HadoopRdd .
Upvotes: 2
Reputation: 1
the compute function in core\src\main\scala\org\apache\spark\rdd\HadoopRDD.scala
here are some code in the function below
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
Upvotes: 0
Reputation: 27455
Apache Spark uses the Hadoop client library for reading the file. So you have to read the hadoop-client
source code to find out more:
Upvotes: 3