vivman
vivman

Reputation: 365

Map Reduce Filter records

I have set of records where i need to process only male records,in map reduce program i have used if condition to filter only male records.but below program giving zero records as output.

Input file:

1,Brandon Buckner,avil,female,525
2,Veda Hopkins,avil,male,633
3,Zia Underwood,paracetamol,male,980
4,Austin Mayer,paracetamol,female,338
5,Mara Higgins,avil,female,153
6,Sybill Crosby,avil,male,193
7,Tyler Rosales,paracetamol,male,778
8,Ivan Hale,avil,female,454
9,Alika Gilmore,paracetamol,female,833
10,Len Burgess,metacin,male,325

Mapreduce Program:

package org.samples.mapreduce.training;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class patientrxMR_filter {

    public static class MapDemohadoop extends
            Mapper<LongWritable, Text, Text, IntWritable> {

        // setup , map, run, cleanup

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] elements = line.split(",");



String gender =elements[3];


if ( gender == "male" ) {

    Text tx = new Text(elements[2]);
                int i = Integer.parseInt(elements[4]);
                IntWritable it = new IntWritable(i);
                context.write(tx, it);
}
        }
    }

    public static class Reduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {

        // setup, reduce, run, cleanup
        // innput - para [150,100]
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Insufficient args");
            System.exit(-1);
        }
        Configuration conf = new Configuration();

        //conf.set("fs.default.name","hdfs://localhost:50000");
        conf.set("mapred.job.tracker", "hdfs://localhost:50001");

//      conf.set("DrugName", args[3]);
        Job job = new Job(conf, "Drug Amount Spent");

        job.setJarByClass(patientrxMR_filter.class); // class conmtains mapper and
                                                // reducer class

        job.setMapOutputKeyClass(Text.class); // map output key class
        job.setMapOutputValueClass(IntWritable.class);// map output value class
        job.setOutputKeyClass(Text.class); // output key type in reducer
        job.setOutputValueClass(IntWritable.class);// output value type in
                                                    // reducer

        job.setMapperClass(MapDemohadoop.class);
        job.setReducerClass(Reduce.class);
        job.setNumReduceTasks(1);
        job.setInputFormatClass(TextInputFormat.class); // default -- inputkey
                                                        // type -- longwritable
                                                        // : valuetype is text
        job.setOutputFormatClass(TextOutputFormat.class);



        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

Upvotes: 1

Views: 4996

Answers (2)

devendra singh
devendra singh

Reputation: 31

if ( gender == "male" ) 

This line doesn't work for equality check, For equality in java pls use object.equals()

i.e 
if ( gender.equals("male") )

Upvotes: 3

Zhangyi Jin
Zhangyi Jin

Reputation: 11

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] elements = line.split(",");

Hadoop is using distributed file system, in "String line = value.toString();" line is the file content in block which has a offset (key). In this case the, the line loads the entire test file, which apparently can fit into one block, instead of each line in the file as you expected.

Upvotes: 1

Related Questions