Reputation: 1437
I want to write a mapreduce code for counting number of records in given CSV file.I am not getting what to do in map and what to do in reduce how should I go about solving this can anyone suggest something?
Upvotes: 2
Views: 11849
Reputation: 711
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class LineCount
{
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text("Total Lines");
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,Reporter reporter)
throws IOException
{
output.collect(word, one);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(LineCount.class);
conf.setJobName("LineCount");
conf.setNumReduceTasks(5);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Upvotes: 0
Reputation: 6139
Hope I have a better solution than the accepted answer.
Instead of emiting 1 for each record, why not we just increment a counter in map() and emit the incremented counter after each map task in cleanup().
The intermediate read writes can be reduced. And reducer need to only aggregate list of few values.
public class LineCntMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
Text keyEmit = new Text("Total Lines");
IntWritable valEmit = new IntWritable();
int partialSum = 0;
public void map(LongWritable key, Text value, Context context) {
partialSum++;
}
public void cleanup(Context context) {
valEmit.set(partialSum);
context.write(keyEmit, valEmit);
}
}
You can find full working code here
Upvotes: 2
Reputation: 366
I'd just use the identity Mapper and the identity Reducer.
This is the Mapper.class and Reducer.class. Then just read the map input records
You really don't have to do any coding to get this.
Upvotes: 0
Reputation: 51
Use job.getcounters() to retrieve the values that you have incremented per record after the completion of the job. If you are using java for writing your mapreduce job then use enum for counting mechanism.
Upvotes: 0
Reputation: 25909
Upvotes: 4
Reputation: 10642
Your mapper must emit a fixed key ( just use a Text with the value "count") an a fixed value of 1 (same as you see in the wordcount example).
Then simply use a LongSumReducer as your reducer.
The output of your job will be a record with the key "count" and the value isthe number of records you are looking for.
You have the option of (dramatically!) improving the performance by using the same LongSumReducer as a combiner.
Upvotes: 3