user2336157
user2336157

Reputation: 11

Multiple files as input to Hadoop Dfs and mapreduce

Usually we give in one text file as input to the java file(say in the case of simple word count problem).Instead,now I have 100 csv files which I want to give as input to my java code.(All files can't simply be merged to 1 single file).Trying to predict max/min stock volatility of the given 100 stocks,hence each csv file is unique. So,how to give the entire folder of the csv files as the input stream to the java program.

Upvotes: 1

Views: 1644

Answers (2)

andani
andani

Reputation: 424

Solution 1: For solving this, we can use FileInputFormat.addInputPaths() method, that can take a comma separated list of multiple inputs and we can write it as

FileInputFormat.addInputPaths(“file0,file1,....”)

or

Suppose 2 files need to be analysed and a list of the people that are using the services of Facebook and youtube (need a single output file out of these)

We have two files facebook.txt and youtube.txt

Path YoutubePath = new Path(args[0]);
Path FacebookPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class);
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);

Adding following lines to the code will yield multiple files to be passed within a single map reduce job.

or

You can pass entire folder as argument

Upvotes: 2

HbnKing
HbnKing

Reputation: 1882

Here is my test code to copy many files to hdfs and merge them ,it could also filter other file format ,I think it might be help to you !

public class FilesMergeToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;

public static void main(String[] args) throws IOException, URISyntaxException {
    // TODO Auto-generated method stub
    list();
}

private static void list() throws IOException, URISyntaxException {
    // TODO Auto-generated method stub

            Configuration conf = new Configuration();   
            URI uri = new URI("hdfs://xxx:9000");//HDFS address
            fs = FileSystem.get(uri,conf);


            local = FileSystem.getLocal(conf);

            FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$"));
            Path[] dirs = FileUtil.stat2Paths(dirsStatus);
            FSDataInputStream in = null;
            FSDataOutputStream out = null;
            for(Path p:dirs){
                    //upload
                String filename = p.getName();
                FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$"));
                Path[] listedPaths = FileUtil.stat2Paths(localStatus);
                //set outputpath
                Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt");
                out =fs.create(block);
                for(Path path:listedPaths){
                    in = local.open(path);
                    IOUtils.copyBytes(in, out, 4096, false); // copydata
                    in.close();
                }
                if (out != null) {
                    out.close();
                }
            }
}

private static class RegexAcceptPathFilter implements PathFilter {

private final String regex;

    public RegexAcceptPathFilter(String regex) {
        this.regex = regex;
    }

    @Override
    public boolean accept(Path path) {
        // TODO Auto-generated method stub
        boolean flag = path.toString().matches(regex);
        return flag;
    }

}

private static class RegexExcludePathFilter  implements PathFilter {
private final String regex;
    public RegexExcludePathFilter (String regex) {
        this.regex = regex;
    }

    @Override
    public boolean accept(Path path) {
        // TODO Auto-generated method stub
        boolean flag = path.toString().matches(regex);
        return !flag;
    }
}
}

Upvotes: 1

Related Questions