RAVITEJA SATYAVADA
RAVITEJA SATYAVADA

Reputation: 2571

Writing a value to file without moving to reducer

I have an input of records like this, a|1|Y, b|0|N, c|1|N, d|2|Y, e|1|Y

Now, in mapper, i has to check the value of third column. If it is 'Y' then that record has to write directly to output file without moving that record to reducer or else i.e, 'N' value records has to move to reducer for further processing..

So, a|1|Y, d|2|Y, e|1|Y should not go to reducer but b|0|N, c|1|N should go to reducer and then to output file.

How can i do this??

Upvotes: 2

Views: 133

Answers (3)

Tariq
Tariq

Reputation: 34184

See if this works,

public class Xxxx {

    public static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {        

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {               

            FileSystem fs = FileSystem.get(context.getConfiguration());
            Random r = new Random();                
            FileSplit split = (FileSplit)context.getInputSplit();
            String fileName = split.getPath().getName();                
            FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));                               
            String parts[];
            String line = value.toString();
            String[] splits = line.split(",");
            for(String s : splits) {
                parts = s.split("\\|");
                if(parts[2].equals("Y")) {                  
                    out.writeBytes(line);
                }else {
                    context.write(key, value);
                }
            }
            out.close();
            fs.close();
        }       
    }

    public static class MyReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        public void reduce(LongWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            for(Text t : values) {
            context.write(key, t);
            }
        }
    }

    /**
     * @param args
     * @throws IOException 
     * @throws InterruptedException 
     * @throws ClassNotFoundException 
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub

        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://localhost:9000");
        conf.set("mapred.job.tracker", "localhost:9001");
        Job job = new Job(conf, "Xxxx");
        job.setJarByClass(Xxxx.class);
        Path outPath = new Path("/output_path");
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        FileInputFormat.addInputPath(job, new Path("/input.txt"));
        FileOutputFormat.setOutputPath(job, outPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Upvotes: 1

triya.chd
triya.chd

Reputation: 13

In your map function, you will get input line by line. Split it according by using | as the delimiter. (by using the String.split() method to be exact) It will look like this

String[] line = value.toString().split('|');

Access the third element of this array by line[2]

Then, using a simple if else statement, emit the output with N value for further processing.

Upvotes: -1

SSaikia_JtheRocker
SSaikia_JtheRocker

Reputation: 5063

What you can probably do is use MultipleOutputs - click here to separate out records of 'Y' and 'N' type to two different files from mappers.

Next, you run saparate jobs for the two newly generated 'Y' and 'N' type data sets. For 'Y' types set number of reducers to 0, so that, Reducers aren't use. And, for 'N' types do it the way you want using reducers.

Hope this helps.

Upvotes: 2

Related Questions