Reputation: 4311
I am loading multiple files into a JavaRDD using
JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);
After loading the files I modify each record and want to save them. However I need to also save the original file name (ID) with the record for future reference. Is there anyway that I can get the original file name from the individual record in RDD? thanks
Upvotes: 6
Views: 6151
Reputation: 4712
You can try to do something like in the following snippet:
JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
"hdfs://path/*.csv",
TextInputFormat.class,
LongWritable.class,
Text.class,
new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;
JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String fileName = fileSplit.getPath().getName();
Stream<Tuple2<String, String>> stream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
.map(line -> {
String lineText = line._2().toString();
// emit file name as key and line as a value
return new Tuple2(fileName, lineText);
});
return stream.iterator();
}, true);
JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
@Override
public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
FileSplit fileSplit = (FileSplit) inputSplit;
final String fileName = fileSplit.getPath().getName();
return new Iterator<Tuple2<String, String>>() {
@Override
public boolean hasNext() {
return lines.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<LongWritable, Text> entry = lines.next();
return new Tuple2<String, String>(fileName, entry._2().toString());
}
};
}
},
true
);
Upvotes: 9
Reputation: 2228
You should be able to use toDebugString. Using wholeTextFile will read in the entire content of your file as one element, whereas sc.textfile creates an RDD with each line as an individual element - as described here.
for example:
val file= sc.textFile("/user/user01/whatever.txt").cache()
val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wordcount.toDebugString
// res0: String =
// (2) ShuffledRDD[4] at reduceByKey at <console>:23 []
// +-(2) MapPartitionsRDD[3] at map at <console>:23 []
// | MapPartitionsRDD[2] at flatMap at <console>:23 []
// | /user/user01/whatever.txt MapPartitionsRDD[1] at textFile at <console>:21 []
// | /user/user01/whatever.txt HadoopRDD[0] at textFile at <console>:21 []
Upvotes: 0
Reputation: 2747
You want spark's wholeTextFiles function. From the documentation:
For example, if you have the following files:
hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn
Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),
then rdd contains
(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)
It returns you an RDD of tuples where the left is the filename and the right is the content.
Upvotes: 4