Reputation: 293
We have written a mapreduce job to process log files. As of now we have around 52GB of input files but it is taking around an hour to process the data.It creates only one reducer job by default.Often we get to see a timeout error in the reduce task and then it restarts and gets completed. Below is the stats for the successful completion of the job. Kindly let us know how the performance can be improved.
File System Counters
FILE: Number of bytes read=876100387
FILE: Number of bytes written=1767603407
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=52222279591
HDFS: Number of bytes written=707429882
HDFS: Number of read operations=351
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Failed reduce tasks=1
Launched map tasks=116
Launched reduce tasks=2
Other local map tasks=116
Total time spent by all maps in occupied slots (ms)=9118125
Total time spent by all reduces in occupied slots (ms)=7083783
Total time spent by all map tasks (ms)=3039375
Total time spent by all reduce tasks (ms)=2361261
Total vcore-seconds taken by all map tasks=3039375
Total vcore-seconds taken by all reduce tasks=2361261
Total megabyte-seconds taken by all map tasks=25676640000
Total megabyte-seconds taken by all reduce tasks=20552415744
Map-Reduce Framework
Map input records=49452982
Map output records=5730971
Map output bytes=864140911
Map output materialized bytes=876101077
Input split bytes=13922
Combine input records=0
Combine output records=0
Reduce input groups=1082133
Reduce shuffle bytes=876101077
Reduce input records=5730971
Reduce output records=5730971
Spilled Records=11461942
Shuffled Maps =116
Failed Shuffles=0
Merged Map outputs=116
GC time elapsed (ms)=190633
CPU time spent (ms)=4536110
Physical memory (bytes) snapshot=340458307584
Virtual memory (bytes) snapshot=1082745069568
Total committed heap usage (bytes)=378565820416
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=52222265669
File Output Format Counters
Bytes Written=707429882
I am getting a classcast exception as below if I increase the number of reducers. I guess the issue comes from the partitioner class.
java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56)
at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {
HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
Text newKey = new Text();
@Override
public int getPartition(CompositeKey key, Text value, int numReduceTasks) {
try {
// Execute the default partitioner over the first part of the key
newKey.set(key.getSubject());
return hashPartitioner.getPartition(newKey, value, numReduceTasks);
} catch (Exception e) {
e.printStackTrace();
return (int) (Math.random() * numReduceTasks); // this would return
// a random value in
// the range
// [0,numReduceTasks)
}
}
}
Mapper Code
public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> {
String recepientAddresses = "";
public static final String DELIVER = "DELIVER";
public static final String RESOLVED = "Resolved";
public static final String JUNK = "Junk E-mail";
public static final String SEMICOLON = ";";
public static final String FW1 = "FW: ";
public static final String FW2 = "Fw: ";
public static final String FW3 = "FWD: ";
public static final String FW4 = "Fwd: ";
public static final String FW5 = "fwd: ";
public static final String RE1 = "RE: ";
public static final String RE2 = "Re: ";
public static final String RE3 = "re: ";
Text mailType = new Text("NEW");
Text fwType = new Text("FW");
Text reType = new Text("RE");
Text recepientAddr = new Text();
@Override
public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
String subj = null;
int lstSize=values.size() ;
if ((lstSize >= 26)) {
if (values.get(8).toString().equals(DELIVER)) {
if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) {
if (!(JUNK.equals((values.get(12).toString())))) {
subj = values.get(17).toString();
recepientAddresses = values.get(11).toString();
String[] recepientAddressArr = recepientAddresses.split(SEMICOLON);
if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3)
|| subj.startsWith(FW4) || subj.startsWith(FW5)) {
mailType = fwType;
subj = subj.substring(4);
} else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) {
mailType = reType;
subj = subj.substring(4);
}
for (int i = 0; i < recepientAddressArr.length; i++) {
CompositeKey ckey = new CompositeKey(subj, values.get(0).toString());
recepientAddr.set(recepientAddressArr[i]);
CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18),
values.get(0));
context.write(ckey, out);
// System.err.println(out);
}
}
}
}
}
Upvotes: 0
Views: 1954
Reputation: 293
There were few sysouts in the reducer code inside the loop which was writing lots of logs and after removing them the reducer gets finished in couple of minutes.!
Upvotes: 1