Mikel Urkia
Mikel Urkia

Reputation: 2095

Hadoop Map Whole File in Java

I am trying to use Hadoop in java with multiple input files. At the moment I have two files, a big one to process and a smaller one that serves as a sort of index.

My problem is that I need to maintain the whole index file unsplitted while the big file is distributed to each mapper. Is there any way provided by the Hadoop API to make such thing?

In case if have not expressed myself correctly, here is a link to a picture that represents what I am trying to achieve: picture

Update:

Following the instructions provided by Santiago, I am now able to insert a file (or the URI, at least) from Amazon's S3 into the distributed cache like this:

job.addCacheFile(new Path("s3://myBucket/input/index.txt").toUri());

However, when the mapper tries to read it a 'file not found' exception occurs, which seems odd to me. I have checked the S3 location and everything seems to be fine. I have used other S3 locations to introduce the input and output file.

Error (note the single slash after the s3:)

FileNotFoundException: s3:/myBucket/input/index.txt (No such file or directory)

The following is the code I use to read the file from the distributed cache:

URI[] cacheFile = output.getCacheFiles();
BufferedReader br = new BufferedReader(new FileReader(cacheFile[0].toString()));
while ((line = br.readLine()) != null) {
     //Do stuff        
}

I am using Amazon's EMR, S3 and the version 2.4.0 of Hadoop.

Upvotes: 0

Views: 587

Answers (3)

Mikel Urkia
Mikel Urkia

Reputation: 2095

Here's what helped me to solve the problem.

Since I am using Amazon's EMR with S3, I have needed to change the syntax a bit, as stated on the following site.

It was necessary to add the name the system was going to use to read the file from the cache, as follows:

job.addCacheFile(new URI("s3://myBucket/input/index.txt" + "#index.txt"));

This way, the program understands that the file introduced into the cache is named just index.txt. I also have needed to change the syntax to read the file from the cache. Instead of reading the entire path stored on the distributed cache, only the filename has to be used, as follows:

URI[] cacheFile = output.getCacheFiles();
BufferedReader br = new BufferedReader(new FileReader(#the filename#));
while ((line = br.readLine()) != null) {
     //Do stuff        
}

Upvotes: 1

Vijay Bhoomireddy
Vijay Bhoomireddy

Reputation: 576

As mentioned above, add your index file to the Distributed Cache and then access the same in your mapper. Behind the scenes. Hadoop framework will ensure that the index file will be sent to all the task trackers before any task is executed and will be available for your processing. In this case, data is transferred only once and will be available for all the tasks related your job.

However, instead of add the index file to the Distributed Cache in your mapper code, make your driver code to implement ToolRunner interface and override the run method. This provides the flexibility of passing the index file to Distributed Cache through the command prompt while submitting the job

If you are using ToolRunner, you can add files to the Distributed Cache directly from the command line when you run the job. No need to copy the file to HDFS first. Use the -files option to add files

hadoop jar yourjarname.jar YourDriverClassName -files cachefile1, cachefile2, cachefile3, ...

You can access the files in your Mapper or Reducer code as below:

File f1 = new File("cachefile1");
File f2 = new File("cachefile2");
File f3 = new File("cachefile3");

Upvotes: 2

Santiago Cepas
Santiago Cepas

Reputation: 4094

You could push the index file to the distributed cache, and it will be copied to the nodes before the mapper is executed.

See this SO thread.

Upvotes: 1

Related Questions