Reputation: 5352
I'm just starting in Spark & Scala
I have a directory with multiple files in it I successfully load them using
sc.wholeTextFiles(directory)
Now I want to go one level up. I actually have a directory that contains sub directories that contain files. My goal is to get an RDD[(String,String)]
so I can move forward, where the RDD
represents name and content of the file.
I tried the following:
val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))
but I got a Seq[RDD[(String,String)]]
How do I transform this Seq
into an RDD[(String,String)]
?
Or maybe I'm not doing things right and I should try a different approach?
Edit: added code
// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"
// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
// val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)
// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
Upvotes: 4
Views: 6271
Reputation: 6871
You should use union
provided by spark context
val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds)
Upvotes: 3
Reputation: 10931
Instead of loading each directory into a separate RDD, can you just use a path wild card to load all directories into a single RDD?
Given the following directory tree...
$ tree test/spark/so
test/spark/so
├── a
│ ├── text1.txt
│ └── text2.txt
└── b
├── text1.txt
└── text2.txt
Create the RDD with a wildcard for the directory.
scala> val rdd = sc.wholeTextFiles("test/spark/so/*/*")
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37
Count is 4 as you would expect.
scala> rdd.count
res9: Long = 4
scala> rdd.collect
res10: Array[(String, String)] =
Array((test/spark/so/a/text1.txt,a1
a2
a3), (test/spark/so/a/text2.txt,a3
a4
a5), (test/spark/so/b/text1.txt,b1
b2
b3), (test/spark/so/b/text2.txt,b3
b4
b5))
Upvotes: 2
Reputation: 9100
You can reduce on the Seq
like this (concatenating the RDD
s with ++
):
val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)
A few more details why can we apply reduce here:
++
is associative - it does not matter you rdda ++ (rddb ++ rddc) or (rdda ++ rddb) ++ rddcSeq
is nonempty (otherwise fold
would be a better choice, it would require an empty RDD[(String, String)]
as the initial accumulator).Depending on the exact type of Seq
, you might get a stackoverflow, so be careful and test with a larger collection, though for the standard library I think it is safe.
Upvotes: 4