Reputation: 899
I have this map class [1] where the goal is just to read and write the content in the same format. The input data is this [2], and it is going to be processed by the map class. I want that this map class does not do any transformation of the data and just outputs the input data. Unfortunately, I get this error [3], and I don't understand where the map class is wrong. Any help to fix the map class?
[1] My map class (now it is corrected).
/** Identity mapper set by the user. */
public static class MyFullyIndentityMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text word = new Text();
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, new IntWritable(Integer.valueOf(itr.nextToken())));
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
System.out.println("Key ( " + context.getCurrentKey().getClass().getName() + " ): " + context.getCurrentKey()
+ " Value (" + context.getCurrentValue().getClass().getName() + "): " + context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
[2] Input data
B 1
C 1
I 1
O 1
C 1
E 1
B 1
B 1
B 1
B 1
[3] Error that I get during the execution of the map class.
Key ( org.apache.hadoop.io.LongWritable ): 0 Value (org.apache.hadoop.io.Text): B
2015-10-11 11:59:54,680 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.util.NoSuchElementException
at java.util.StringTokenizer.nextToken(StringTokenizer.java:349)
at org.apache.hadoop.mapred.examples.MyWordCount$MyFullyIndentityMapper.map(MyWordCount.java:93)
at org.apache.hadoop.mapred.examples.MyWordCount$MyFullyIndentityMapper.run(MyWordCount.java:104)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
[4] My Reduce class
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
Iterator iter = values.iterator();
while(iter.hasNext()){
System.out.println(iter.next());
}
for (IntWritable val : values) {
System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
+ " value ( " + val.getClass().toString() + " ): " + val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void run(Context context) throws IOException, InterruptedException {
System.out.println("Output dir: " + context.getConfiguration().get("mapred.output.dir"));
System.out.println("Partitioner class: " + context.getConfiguration().get("mapreduce.partitioner.class"));
try {
while (context.nextKey()) {
System.out.println("Key: " + context.getCurrentKey());
reduce(context.getCurrentKey(), context.getValues(), context);
}
} finally {
cleanup(context);
}
}
}
[5] Main class
public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2);
}
// first map tasks
JobConf conf = new JobConf(MyWordCount.class);
conf.setJobName("wordcount");
conf.setJarByClass(MyWordCount.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);
Path[] inputPaths = new Path[otherArgs.length-1];
for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath);
// launch the job directly
Job job = new Job(conf, conf.getJobName());
job.setJarByClass(MyWordCount.class);
job.setMapperClass(MyFullyIndentityMapper.class);
job.setReducerClass(MyReducer.class);
job.setPartitionerClass(HashPartitioner.class);
job.waitForCompletion(true);
System.exit(0);
}
[6] Here are the imports that I use, just in case it is necessary
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.StringTokenizer;
Upvotes: 0
Views: 208
Reputation: 899
Found the problem. The signature must be public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {...}
Upvotes: 0
Reputation: 1170
Please check your input file again.
Key ( org.apache.hadoop.io.LongWritable ): 0 Value (org.apache.hadoop.io.Text): B
From the above line it is understood that context is fetching your value as just B
and not as B 1
.
And therefore when we try to fetch the next token from the string tokenizer and set it in val part it throws an error.
Upvotes: 1