Reputation: 365
I have set of records where i need to process only male records,in map reduce program i have used if condition to filter only male records.but below program giving zero records as output.
Input file:
1,Brandon Buckner,avil,female,525
2,Veda Hopkins,avil,male,633
3,Zia Underwood,paracetamol,male,980
4,Austin Mayer,paracetamol,female,338
5,Mara Higgins,avil,female,153
6,Sybill Crosby,avil,male,193
7,Tyler Rosales,paracetamol,male,778
8,Ivan Hale,avil,female,454
9,Alika Gilmore,paracetamol,female,833
10,Len Burgess,metacin,male,325
Mapreduce Program:
package org.samples.mapreduce.training;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class patientrxMR_filter {
public static class MapDemohadoop extends
Mapper<LongWritable, Text, Text, IntWritable> {
// setup , map, run, cleanup
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] elements = line.split(",");
String gender =elements[3];
if ( gender == "male" ) {
Text tx = new Text(elements[2]);
int i = Integer.parseInt(elements[4]);
IntWritable it = new IntWritable(i);
context.write(tx, it);
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
// setup, reduce, run, cleanup
// innput - para [150,100]
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Insufficient args");
System.exit(-1);
}
Configuration conf = new Configuration();
//conf.set("fs.default.name","hdfs://localhost:50000");
conf.set("mapred.job.tracker", "hdfs://localhost:50001");
// conf.set("DrugName", args[3]);
Job job = new Job(conf, "Drug Amount Spent");
job.setJarByClass(patientrxMR_filter.class); // class conmtains mapper and
// reducer class
job.setMapOutputKeyClass(Text.class); // map output key class
job.setMapOutputValueClass(IntWritable.class);// map output value class
job.setOutputKeyClass(Text.class); // output key type in reducer
job.setOutputValueClass(IntWritable.class);// output value type in
// reducer
job.setMapperClass(MapDemohadoop.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class); // default -- inputkey
// type -- longwritable
// : valuetype is text
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Upvotes: 1
Views: 4996
Reputation: 31
if ( gender == "male" )
This line doesn't work for equality check, For equality in java pls use object.equals()
i.e
if ( gender.equals("male") )
Upvotes: 3
Reputation: 11
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] elements = line.split(",");
Hadoop is using distributed file system, in "String line = value.toString();" line is the file content in block which has a offset (key). In this case the, the line loads the entire test file, which apparently can fit into one block, instead of each line in the file as you expected.
Upvotes: 1