Reputation: 11
Usually we give in one text file as input to the java file(say in the case of simple word count problem).Instead,now I have 100 csv files which I want to give as input to my java code.(All files can't simply be merged to 1 single file).Trying to predict max/min stock volatility of the given 100 stocks,hence each csv file is unique. So,how to give the entire folder of the csv files as the input stream to the java program.
Upvotes: 1
Views: 1644
Reputation: 424
Solution 1: For solving this, we can use FileInputFormat.addInputPaths() method, that can take a comma separated list of multiple inputs and we can write it as
FileInputFormat.addInputPaths(“file0,file1,....”)
or
Suppose 2 files need to be analysed and a list of the people that are using the services of Facebook and youtube (need a single output file out of these)
We have two files facebook.txt and youtube.txt
Path YoutubePath = new Path(args[0]);
Path FacebookPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class);
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
Adding following lines to the code will yield multiple files to be passed within a single map reduce job.
or
You can pass entire folder as argument
Upvotes: 2
Reputation: 1882
Here is my test code to copy many files to hdfs and merge them ,it could also filter other file format ,I think it might be help to you !
public class FilesMergeToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;
public static void main(String[] args) throws IOException, URISyntaxException {
// TODO Auto-generated method stub
list();
}
private static void list() throws IOException, URISyntaxException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
URI uri = new URI("hdfs://xxx:9000");//HDFS address
fs = FileSystem.get(uri,conf);
local = FileSystem.getLocal(conf);
FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$"));
Path[] dirs = FileUtil.stat2Paths(dirsStatus);
FSDataInputStream in = null;
FSDataOutputStream out = null;
for(Path p:dirs){
//upload
String filename = p.getName();
FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$"));
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//set outputpath
Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt");
out =fs.create(block);
for(Path path:listedPaths){
in = local.open(path);
IOUtils.copyBytes(in, out, 4096, false); // copydata
in.close();
}
if (out != null) {
out.close();
}
}
}
private static class RegexAcceptPathFilter implements PathFilter {
private final String regex;
public RegexAcceptPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}
}
private static class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter (String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}
}
}
Upvotes: 1