Reputation: 4961
Hi Please find my Code Below, Which is throwing exception.
package HadoopMapReduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HospitalsMapReduce {
public static class TokenizerMapper
extends Mapper<Text, Text, Text, Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Text val = new Text();
public void map(Text key, Text value, Reducer.Context context) throws IOException, InterruptedException {
System.out.println("This is Value " + value);
String rec[] = value.toString().split(",");
String disease=rec[0];
String name = rec[1];
String loc = rec[2];
int budget = Integer.parseInt(rec[3]);
int rating = Integer.parseInt(rec[4]);
String val1=1+","+name+","+budget+","+rating;
if (loc.equalsIgnoreCase("Pune")) {
word.set(disease);
val.set(val1);
context.write(word, val);
}
}
}
public static class IntSumReducer
extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterator<Text> values,
Reducer.Context context
) throws IOException, InterruptedException {
int sum = 0;
int budget=0;
float avgBudget=0;
while(values.hasNext())
{
String value[]=values.next().toString().split(",");
sum=sum+Integer.parseInt(value[0]);
budget=budget+ Integer.parseInt(value[2]);
}
avgBudget=budget/sum;
result.set(sum+" "+avgBudget);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path("/test/output2/");
if (hdfs.exists(output)) {
hdfs.delete(output, true);
}
Job job = Job.getInstance(conf, "Hospital count");
job.setJarByClass(HospitalCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class,Text.class);
FileInputFormat.addInputPath(job, new Path("/test/hospital"));
FileOutputFormat.setOutputPath(job, output);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Here is My Error Log :
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/NetBeansProjects/BDGRUSDML/Libs/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/NetBeansProjects/BDGRUSDML/Libs/slf4j-nop-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2016-05-29 11:50:41,302 WARN util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-05-29 11:50:41,965 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1173)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2016-05-29 11:50:41,965 INFO jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2016-05-29 11:50:42,024 WARN mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(64)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2016-05-29 11:50:42,046 WARN mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2016-05-29 11:50:42,093 INFO input.FileInputFormat (FileInputFormat.java:listStatus(283)) - Total input paths to process : 1
2016-05-29 11:50:42,148 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(198)) - number of splits:1
2016-05-29 11:50:42,255 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(287)) - Submitting tokens for job: job_local527592655_0001
2016-05-29 11:50:42,439 INFO mapreduce.Job (Job.java:submit(1294)) - The url to track the job: http://localhost:8080/
2016-05-29 11:50:42,440 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1339)) - Running job: job_local527592655_0001
2016-05-29 11:50:42,441 INFO mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null
2016-05-29 11:50:42,450 INFO output.FileOutputCommitter (FileOutputCommitter.java:<init>(100)) - File Output Committer Algorithm version is 1
2016-05-29 11:50:42,455 INFO mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2016-05-29 11:50:42,537 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks
2016-05-29 11:50:42,538 INFO mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local527592655_0001_m_000000_0
2016-05-29 11:50:42,565 INFO output.FileOutputCommitter (FileOutputCommitter.java:<init>(100)) - File Output Committer Algorithm version is 1
2016-05-29 11:50:42,579 INFO mapred.Task (Task.java:initialize(612)) - Using ResourceCalculatorProcessTree : [ ]
2016-05-29 11:50:42,584 INFO mapred.MapTask (MapTask.java:runNewMapper(756)) - Processing split: hdfs://127.0.0.1:9000/test/hospital/hospitals.txt:0+624
2016-05-29 11:50:42,671 INFO mapred.MapTask (MapTask.java:setEquator(1205)) - (EQUATOR) 0 kvi 26214396(104857584)
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(998)) - mapreduce.task.io.sort.mb: 100
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(999)) - soft limit at 83886080
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(1000)) - bufstart = 0; bufvoid = 104857600
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(1001)) - kvstart = 26214396; length = 6553600
2016-05-29 11:50:42,675 INFO mapred.MapTask (MapTask.java:createSortingCollector(403)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2016-05-29 11:50:42,733 INFO mapred.MapTask (MapTask.java:flush(1460)) - Starting flush of map output
2016-05-29 11:50:42,747 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2016-05-29 11:50:42,760 WARN mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local527592655_0001
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-05-29 11:50:43,444 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1360)) - Job job_local527592655_0001 running in uber mode : false
2016-05-29 11:50:43,446 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - map 0% reduce 0%
2016-05-29 11:50:43,449 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Job job_local527592655_0001 failed with state FAILED due to: NA
2016-05-29 11:50:43,465 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Counters: 0
Edit :
After Modifying below code I am able to execute my code now. From Reducer.Context to Context
public void map(Text key, Text value,Context context) throws IOException, InterruptedException
Please find below updated code :
package HadoopMapReduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HospitalsMapReduce {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Text val = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("This is Value " + value);
String rec[] = value.toString().split(",");
String disease=rec[0];
String name = rec[1];
String loc = rec[2];
int budget = Integer.parseInt(rec[3]);
int rating = Integer.parseInt(rec[4]);
String val1=1+","+name+","+budget+","+rating;
if (loc.equalsIgnoreCase("Pune")) {
word.set(disease);
val.set(val1);
context.write(word, val);
}
}
}
public static class IntSumReducer
extends Reducer<Text, Iterator<Text>, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterator<Text> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
int budget=0;
float avgBudget=0;
System.out.println("This is Reducer Jobs");
while(values.hasNext())
{
String value[]=values.next().toString().split(",");
System.out.println("This is Value " + value);
sum=sum+Integer.parseInt(value[0]);
budget=budget+ Integer.parseInt(value[2]);
}
avgBudget=budget/sum;
result.set(sum+" "+avgBudget);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path("/test/output2/");
if (hdfs.exists(output)) {
hdfs.delete(output, true);
}
Job job = Job.getInstance(conf, "Hospital_count");
job.setJarByClass(HospitalsMapReduce.class);
job.setMapperClass(TokenizerMapper.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class, IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/test/hospital/"));
FileOutputFormat.setOutputPath(job, output);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
But Now my problem is, Reducer function is not getting executed. My output only shows output of map function.
Upvotes: 1
Views: 885
Reputation: 75
Use Hadoop Iterable inplace of Java Iterator.
Change your reducer definition and code as follows.
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.println("This is Red Value ");
int sum = 0;
int budget = 0;
float avgBudget = 0;
System.out.println("This is Reducer Jobs");
for (Text val : values) {
String value[] = val.toString().split(",");
System.out.println("This is Reduce Value " + value);
sum = sum + Integer.parseInt(value[0]);
budget = budget + Integer.parseInt(value[2]);
}
avgBudget = budget / sum;
result.set(sum + " " + avgBudget);
context.write(key, result);
}}
Upvotes: 1
Reputation: 154
your reducer definition should look like,
public static class IntSumReducer
extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
Context context) throws IOException, InterruptedException {
//your logic
}
}
Upvotes: 0
Reputation: 370
Looks your combiner is causing the issue. You have used your reducer functions as your combiner. However, the output format of the map function and combiner function is not same which should not happen. The combiner is called on the output of map functions and is an input to further combiner operations or to reduce operations. Reducer expects the same format of Key-Value pairs from the data that reaches it, whether it comes out after combiner processing or not.
Also, from the code written above, i see that finding averages in combiner function is not the right thing to do. The averages will never come out to be correct.
For of all, remove the combiner operation as its only for boosting performance. Introduce it once you know your code works well functionally.
Upvotes: 0
Reputation: 63062
I will summarize your issue as
My keys and values are all strings (
Text
) but the Map/Reduce framework believes I am providing numbers (LongWritable
)
Well I would agree that the source code would likely make that impossible since all of the mapper/reducer keys and values are Text
.
So you might want to look into the packaging of your app jar file - to see if you were getting the correct versions being shipped to the hadoop cluster. Otherwise it just seems your code could not end up with the given exceptions.
Upvotes: 0