Reputation: 189
I have an input file
UserId|TrackId|Shared|Radio|Skip
111115|222|0|1|0
111113|225|1|0|0
111117|223|0|1|1
111115|225|1|0|0
I need to add the Shared and Radio columns for all the track IDs The output should be
222,1
223,1
225,2
With the below program that I have written, I get
222,1
223,1
225,1
225,2.
Not sure what the error is
This is my program
public class Total {
public static class ListenMap extends Mapper<LongWritable, Text, Text, IntWritable>
{
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException
{
String slt= values.toString();
String arr[]= slt.split("[|]");
String trackid= arr[1];
String shared= arr[2];
String radio= arr[3];
int sharenum= Integer.parseInt(shared);
int radionum= Integer.parseInt(radio);
int total= sharenum+radionum;
context.write(new Text(trackid), new IntWritable(total));
}
}
public static class ListenReduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable x: values)
{
sum+=x.get();
context.write(key, new IntWritable(sum));
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
Configuration conf= new Configuration();
Job job= new Job(conf, "listen");
job.setJarByClass(Total.class);
job.setMapperClass(ListenMap.class);
job.setReducerClass(ListenReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 1:0);
}
}
Upvotes: 1
Views: 81
Reputation: 20820
You are writing your context object inside for loop, that's why you can see duplicate keys.
Instead It should be written only once for each key.
public static class ListenReduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable x: values)
{
sum+=x.get();
}
// Write it here
context.write(key, new IntWritable(sum));
}
}
Upvotes: 1
Reputation: 191681
Move context.write(key, new IntWritable(sum));
outside the loop unless you want to print each value of sum after you increment it.
I'm going to assume the period is a typo in asking the question because your code isn't adding that.
Upvotes: 1
Reputation: 13927
You write out the result in the for loop. Move it outside:
public static class ListenReduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable x: values)
{
sum+=x.get();
}
context.write(key, new IntWritable(sum));
}
}
Upvotes: 1