xeon123
xeon123

Reputation: 899

Map class does not work as supposed

I have this map class [1] where the goal is just to read and write the content in the same format. The input data is this [2], and it is going to be processed by the map class. I want that this map class does not do any transformation of the data and just outputs the input data. Unfortunately, I get this error [3], and I don't understand where the map class is wrong. Any help to fix the map class?

[1] My map class (now it is corrected).

    /** Identity mapper set by the user. */
public static class MyFullyIndentityMapper
        extends Mapper<LongWritable, Text, Text, IntWritable>{

    private Text word = new Text();
    private final static IntWritable one = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context
    ) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, new IntWritable(Integer.valueOf(itr.nextToken())));
        }
    }

    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
            while (context.nextKeyValue()) {
                System.out.println("Key ( " + context.getCurrentKey().getClass().getName() + " ): " + context.getCurrentKey()
                        + " Value (" + context.getCurrentValue().getClass().getName() + "): " + context.getCurrentValue());
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            cleanup(context);
        }
    }
}

[2] Input data

B   1
C   1
I   1
O   1
C   1
E   1
B   1
B   1
B   1
B   1

[3] Error that I get during the execution of the map class.

Key ( org.apache.hadoop.io.LongWritable ): 0 Value (org.apache.hadoop.io.Text): B
2015-10-11 11:59:54,680 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.util.NoSuchElementException
  at java.util.StringTokenizer.nextToken(StringTokenizer.java:349)
  at org.apache.hadoop.mapred.examples.MyWordCount$MyFullyIndentityMapper.map(MyWordCount.java:93)
  at org.apache.hadoop.mapred.examples.MyWordCount$MyFullyIndentityMapper.run(MyWordCount.java:104)
  at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
  at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
  at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

[4] My Reduce class

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context
    ) throws IOException, InterruptedException {
        int sum = 0;
        Iterator iter = values.iterator();
        while(iter.hasNext()){
            System.out.println(iter.next());
        }
        for (IntWritable val : values) {
            System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                    + " value ( " + val.getClass().toString() + " ): " + val.toString());
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }

    public void run(Context context) throws IOException, InterruptedException {
        System.out.println("Output dir: " + context.getConfiguration().get("mapred.output.dir"));
        System.out.println("Partitioner class: " + context.getConfiguration().get("mapreduce.partitioner.class"));
        try {
            while (context.nextKey()) {
                System.out.println("Key: " + context.getCurrentKey());
                reduce(context.getCurrentKey(), context.getValues(), context);
            }
        } finally {
            cleanup(context);
        }
    }
}

[5] Main class

public static void main(String[] args) throws Exception {
    GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

    String[] otherArgs = parser.getRemainingArgs();
    if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount [<in>...] <out>");
        System.exit(2);
    }

    // first map tasks
    JobConf conf = new JobConf(MyWordCount.class);
    conf.setJobName("wordcount");

    conf.setJarByClass(MyWordCount.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    conf.setNumReduceTasks(1);

    Path[] inputPaths = new Path[otherArgs.length-1];
    for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
    Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
    FileInputFormat.setInputPaths(conf, inputPaths);
    FileOutputFormat.setOutputPath(conf, outputPath);

    // launch the job directly
    Job job = new Job(conf, conf.getJobName());
    job.setJarByClass(MyWordCount.class);
    job.setMapperClass(MyFullyIndentityMapper.class);
    job.setReducerClass(MyReducer.class);
    job.setPartitionerClass(HashPartitioner.class);

    job.waitForCompletion(true);

    System.exit(0);
}

[6] Here are the imports that I use, just in case it is necessary

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.StringTokenizer;

Upvotes: 0

Views: 208

Answers (2)

xeon123
xeon123

Reputation: 899

Found the problem. The signature must be public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {...}

Upvotes: 0

madhu
madhu

Reputation: 1170

Please check your input file again.

Key ( org.apache.hadoop.io.LongWritable ): 0 Value (org.apache.hadoop.io.Text): B

From the above line it is understood that context is fetching your value as just B and not as B 1. And therefore when we try to fetch the next token from the string tokenizer and set it in val part it throws an error.

Upvotes: 1

Related Questions