Shashank Mudlapur
Shashank Mudlapur

Reputation: 41

Retrieve Filename of current line in Mapper

I am using Hadoop version 2.6.4 . I was writing a MapReduce job which would take 3 arguments namely -Keyword,Path to input files and output files. My ideal output should be names of all those files containing the keyword. The simple logic would be go through each line in the text and match it with our keyword. If it returns true print the filename .

After extensive googling I found 3 options to obtain the filename

  1. Context.getConfiguration().get("map.input.file")
  2. Context.getConfiguration().get("mapreduce.map.input.file")

Both methods returned a string with the value 'null' i.e they printed 'null' on my terminal screen.

  1. Finally I tried this from site.google.com

    public Path filesplit;
    filesplit=((FileSplit)context.getInputSplit()).getPath();
    System.out.println(filesplit.getName())
    

The above method produced an error. The terminal output looked like this :-

java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
    at org.myorg.DistributedGrep$GrepMapper.map(DistributedGrep.java:23)
    at org.myorg.DistributedGrep$GrepMapper.map(DistributedGrep.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
  1. Can anybody suggest a remedy to these errors? What might have gone wrong and are there any mistakes on my part?

  2. Or do you have any other ideas of obtaining the filename of the current line being executed at the mapper.

If you want to have a full look on the code here it is

package org.myorg;

import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrep {

    public static class GrepMapper extends
            Mapper<Object, Text, NullWritable, Text> {
        public  Path filesplit;

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            filesplit = ((FileSplit)context.getInputSplit()).getPath();

            String txt = value.toString();
            String mapRegex = context.getConfiguration().get("mapregex");
//          System.out.println(context.getConfiguration().get("mapreduce.map.input.file");
            System.out.println(filesplit.getName());
            if (txt.matches(mapRegex)) {
                System.out.println("Matched a line");
                context.write(NullWritable.get(), value);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set("mapregex", args[0]);

        Job job = new Job(conf, "Distributed Grep");
        job.setJarByClass(DistributedGrep.class);
        job.setMapperClass(GrepMapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0); // Set number of reducers to zero
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Upvotes: 3

Views: 1039

Answers (1)

Binary Nerd
Binary Nerd

Reputation: 13927

Options 1 & 2

Context.getConfiguration().get("map.input.file") Context.getConfiguration().get("mapreduce.map.input.file")

I believe both of these return null because they should be used with the older mapred API and its JobConf configuration object. Your #3 option is the way to do it for the mapreduce API.

Using the mapred API you would do the following:

public void configure(JobConf job) {
  inputFile = job.get(JobContext.MAP_INPUT_FILE);
}

This is shown here: https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapred/Mapper.html

The JobContext.MAP_INPUT_FILE constant value used to be map.input.file and was changed to mapreduce.map.input.file at some point.

https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

Option 3

You're mixing MapReduce APIs. There are two the mapred and mapreduce API.

You can see this in the error you're getting:

Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit

You've imported:

import org.apache.hadoop.mapred.FileSplit

Which is from the mapred API, but you're using the mapreduce API. Change the import to:

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

One clue in your code (aside from the import) that this has happened is you've needed to add a cast to make the code compile:

filesplit = ((FileSplit)context.getInputSplit()).getPath();

This should be:

filesplit = context.getInputSplit().getPath();

Upvotes: 2

Related Questions