Asha
Asha

Reputation: 4311

How to Get the file name for record in spark RDD (JavaRDD)

I am loading multiple files into a JavaRDD using

JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

After loading the files I modify each record and want to save them. However I need to also save the original file name (ID) with the record for future reference. Is there anyway that I can get the original file name from the individual record in RDD? thanks

Upvotes: 6

Views: 6151

Answers (3)

szhem
szhem

Reputation: 4712

You can try to do something like in the following snippet:

JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
    "hdfs://path/*.csv", 
    TextInputFormat.class, 
    LongWritable.class, 
    Text.class, 
    new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
    FileSplit fileSplit = (FileSplit) inputSplit;
    String fileName = fileSplit.getPath().getName();

    Stream<Tuple2<String, String>> stream =
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
            .map(line -> {
                String lineText = line._2().toString();
                // emit file name as key and line as a value
                return new Tuple2(fileName, lineText);
            });
    return stream.iterator();
}, true);

Update (for java7)

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
    new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
        @Override
        public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
            FileSplit fileSplit = (FileSplit) inputSplit;
            final String fileName = fileSplit.getPath().getName();
            return new Iterator<Tuple2<String, String>>() {
                @Override
                public boolean hasNext() {
                    return lines.hasNext();
                }
                @Override
                public Tuple2<String, String> next() {
                    Tuple2<LongWritable, Text> entry = lines.next();
                    return new Tuple2<String, String>(fileName, entry._2().toString());
                }
            };
        }
    }, 
    true
);

Upvotes: 9

Gillespie
Gillespie

Reputation: 2228

You should be able to use toDebugString. Using wholeTextFile will read in the entire content of your file as one element, whereas sc.textfile creates an RDD with each line as an individual element - as described here.

for example:

val file= sc.textFile("/user/user01/whatever.txt").cache()

val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wordcount.toDebugString

// res0: String =

// (2) ShuffledRDD[4] at reduceByKey at <console>:23 []

// +-(2) MapPartitionsRDD[3] at map at <console>:23 []

//    |  MapPartitionsRDD[2] at flatMap at <console>:23 []

//    |  /user/user01/whatever.txt MapPartitionsRDD[1] at textFile at <console>:21 []

//    |  /user/user01/whatever.txt HadoopRDD[0] at textFile at <console>:21 []

Upvotes: 0

dpeacock
dpeacock

Reputation: 2747

You want spark's wholeTextFiles function. From the documentation:

For example, if you have the following files:

   hdfs://a-hdfs-path/part-00000
   hdfs://a-hdfs-path/part-00001
   ...
   hdfs://a-hdfs-path/part-nnnnn

Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),

then rdd contains

   (a-hdfs-path/part-00000, its content)
   (a-hdfs-path/part-00001, its content)
   ...
   (a-hdfs-path/part-nnnnn, its content)

It returns you an RDD of tuples where the left is the filename and the right is the content.

Upvotes: 4

Related Questions