Eileen Jr
Eileen Jr

Reputation: 91

Bufferreader and Bufferwriter for reading and writing hdfs files

I'm trying to read from a hdfs file line by line and then create a hdfs file and write to it line by line. The code that I use looks like this:

            Path FileToRead=new Path(inputPath);
        FileSystem hdfs = FileToRead.getFileSystem(new Configuration());            
        FSDataInputStream fis = hdfs.open(FileToRead);
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

        String line;
            line = reader.readLine(); 
            while (line != null){

                String[] lineElem = line.split(",");
                for(int i=0;i<10;i++){

                    MyMatrix[i][Integer.valueOf(lineElem[0])-1] = Double.valueOf(lineElem[i+1]);
                }

                line=reader.readLine();
        } 

        reader.close();
        fis.close();


        Path FileToWrite = new Path(outputPath+"/V"); 
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataOutputStream fileOut = fs.create(FileToWrite);
        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOut));
        writer.write("check");
        writer.close();
        fileOut.close();

When I run this code in my outputPath file V has not been created. But if I replace the part for reading with the part for writing the file will be created and check is written into it. Can anyone please help me understand how to use them correctly to be able to read first the whole file and then write to the file line by line?

I have also tried another code for reading from one file and writing to another one but the file will be created but there is nothing written into it!

I use sth like this:

  hadoop jar main.jar program2.Main input output

Then in my first job I read from arg[0] and write to a file in args[1]+"/NewV" using map reduce classes and it works. In my other class (non map reduce)I use args[1]+"/NewV" as input path and output+"/V_0" as output path (I pass these strings to constructor). here is the code for the class :

 public class Init_V {

String inputPath, outputPath;


public Init_V(String inputPath, String outputPath) throws Exception {

    this.inputPath = inputPath;
    this.outputPath = outputPath;


    try{            

        FileSystem fs = FileSystem.get(new Configuration());
        Path FileToWrite = new Path(outputPath+"/V.txt"); 
        Path FileToRead=new Path(inputPath);
        BufferedWriter output = new BufferedWriter
         (new OutputStreamWriter(fs.create(FileToWrite,
                 true)));  

        BufferedReader reader = new
            BufferedReader(new InputStreamReader(fs.open(FileToRead)));
                 String data;
                 data = reader.readLine();
                 while ( data != null ) 
                 {
                     output.write(data);
                     data = reader.readLine();
                 }
                 reader.close();                     
                 output.close(); }catch(Exception e){
}

}

}

Upvotes: 0

Views: 8047

Answers (1)

smttsp
smttsp

Reputation: 4191

I think, you need to understand how hadoop works properly. In hadoop, many thing is done by the system, you are just giving input and output path, then they are opened and created by hadoop if the paths are valid. Check the following example;

public int run (String[] args) throws Exception{

    if(args.length != 3){
        System.err.println("Usage: MapReduce <input path> <output path> ");
        ToolRunner.printGenericCommandUsage(System.err);
    }
    Job job = new Job();
    job.setJarByClass(MyClass.class);
    job.setNumReduceTasks(5);
    job.setJobName("myclass");
    FileInputFormat.addInputPath(job, new Path(args[0]) );
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0:1 ;
}


/* ----------------------main---------------------*/
public static void main(String[] args) throws Exception{    

    int exitCode = ToolRunner.run(new MyClass(), args);
    System.exit(exitCode);
}

As you see here, you only initialize necessary variables and reading&writing is done by hadoop.

Also, in your Mapper class you are saying context.write(key, value) inside map, and similarly in your Reduce class you are doing same, it writes for you.

If you use BufferedWriter/Reader it will write to your local file system not to HDFS. To see files in HDFS you should write hadoop fs -ls <path>, the files you are looking by ls command are in your local file system

EDIT: In order to use read/write you should know the followings: Let say you have N machine in your hadoop network. When you want to read, you will not know which mapper is reading, similarly writing. So, all mappers and reducer should have those paths not to give exception.

I dont know if you could use any other class but you can use two methods for your specific reason: startup and cleanup. These methods are used only once in each map and reduce worker. So if you want to read and write you can use that files. Reading and writing is same as normal java code. For example, you want to see something for each key, and want to write it to a txt. You can do the following:

//in reducer
BufferedReader bw ..;

void startup(...){
     bw  = new ....;
}

void reduce(...){
    while(iter.hasNext()){ ....;
    }
    bw.write(key, ...);
}
void cleanup(...){
    bw.close();
}

Upvotes: 1

Related Questions