Reputation: 1595
I 'm using spark to read data of all files from HDFS in a single RDD from a directory and it's sub directories as well. I could not find any efficient method to do that. So I tried to write some customized code as shown below:
public Object fetch(String source,String sink) {
//reading data
boolean isDir=new File(source).isDirectory();
System.out.println("isDir="+isDir);
JavaRDD<String> lines;
if(isDir)
{
lines=readFiles(new File(source).listFiles(), null);
}
else
lines= sc.textFile(source);
lines.saveAsTextFile(sink);
return true;
}
public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
for (File file : files) {
if (file.isDirectory()) {
readFiles(file.listFiles(),lines); // Calls same method again.
}
else {
if(lines==null)
lines=sc.textFile(file.getPath());
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}
but this is not doing my expected job as isDir contains false telling that it's not a directory. Please can u guide me about what's wrong? and is there some efficient way to do this job? Thanks alot
Upvotes: 3
Views: 9215
Reputation: 1278
the "*" character reads the folder recursively
JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("/my/directory/*");
read this link for more info:
http://spark.apache.org/docs/latest/programming-guide.html#external-datasets
Upvotes: 2
Reputation: 346
As spark can read data based on a Hadoop Job configuration, you can use the FileInputFormat#setInputDirRecursive
method.
JavaSparkContext context = new JavaSparkContext();
Job job;
try {
job = Job.getInstance();
FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
.values();
Obviously you will end up with a Text data type instead of a String.
Upvotes: 3
Reputation: 1595
so finally I found a solution. I was at mistake as I was using object of File that is used for reading files from local file system. In order to read/write to HDFS we need to use org.apache.hadoop.fs.*
so here is the solution
public Object fetch(String source,String sink) {
//reading data
Path src=new Path(source);
try {
if(fs.exists(src))
{
FileStatus[] lists=fs.listStatus(src);
readFiles(lists);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
public void readFiles(FileStatus[] files) {
for(int i=0;i<files.length;i++)
{
if(files[i].isDirectory())
{
try {
readFiles(fs.listStatus(files[i].getPath()));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else
{
if(lines==null)
{
Path p=files[i].getPath();
JavaRDD<String> lines=sc.textFile(p.toString());
}
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}
Upvotes: 0