Reputation: 11
I am a beginner of MapReduce programming and have coded the following Java program for running in a Hadoop cluster comprising 1 NameNode and 3 DatanNodes :
package trial;
import java.io.IOException;
import java.util.*;
import java.lang.Iterable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class Trial
{
public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
String[] rows = value.toString().split("\r?\n");
for(int i=0;i<rows.length;i++)
{
String[] cols = rows[i].toString().split(",");
String v=cols[0];
for(int j=1;j<cols.length;j++)
{
String k =j+","+cols[j];
output.collect(new Text(k),new Text(v));
}
}
}
}
public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException
{
int count =0;
String[] attr = key.toString().split(",");
List<String> list = new ArrayList<String>();
while(values.hasNext())
{
list.add((values.next()).toString());
count++;
}
String v=Integer.toString(count);
for(String s:list)
{
output.collect(new Text(s),new Text(v));
}
}
}
public static void main(String[] args) throws IOException
{
JobConf conf1 = new JobConf(Trial.class);
conf1.setJobName("Trial");
conf1.setOutputKeyClass(Text.class);
conf1.setOutputValueClass(Text.class);
conf1.setMapperClass(MapA.class);
//conf.setCombinerClass(Combine.class);
conf1.setReducerClass(ReduceA.class);
conf1.setInputFormat(TextInputFormat.class);
conf1.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf1, new Path(args[0]));
FileOutputFormat.setOutputPath(conf1, new Path(args[1]));
JobClient.runJob(conf1);
JobConf conf2 = new JobConf(Final.class);
conf2.setJobName("Final");
conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(Text.class);
conf2.setMapperClass(Final.MapB.class);
//conf.setCombinerClass(Combine.class);
conf2.setReducerClass(Final.ReduceB.class);
conf2.setInputFormat(TextInputFormat.class);
conf2.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf2, new Path(args[1]));
FileOutputFormat.setOutputPath(conf2, new Path(args[2]));
JobClient.runJob(conf2);
}
}
class Final
{
public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
String[] r = value.toString().split("\r?\n");
String[] p1= new String[5];
for(int i=0;i<r.length;i++)
{
p1 = r[i].split("\t");
output.collect(new Text(p1[0]),new Text(p1[1]));
}
}
}
public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException
{
int sum=0;
while(values.hasNext())
{
String s = (values.next()).toString();
int c=Integer.parseInt(s);
sum+=c;
}
float avf =(float)sum/3;
String count=Float.toString(avf);
output.collect(key,new Text(count));
}
}
}
The program is run on a dataset like this:
ID1,1,2,3
ID1,1,3,2
ID3,2,2,3
Each row has an ID followed by 3 comma-separated attributes. My problem is to find the frequency of the value of each attribute(along the column not across the row if the dataset is seen as a 2-D array) of each ID and then sum up the frequencies of each attribute for an ID and find the average.Thus for the above the dataset:
ID1 : 2+2+2/3=2
ID2 : 2+1+1/3=1.33
ID3 : 1+2+2/3=1.67
The above code is working well with small datasets like 200-500MB. But for datasets above 1GB I am getting an error like this:
map 100% reduce 50%
14/04/12 12:33:06 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000001_0, Status : FAILED
Error: Java heap space
attempt_201404121146_0002_r_000001_0: Exception in thread "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:397)
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
attempt_201404121146_0002_r_000001_0: at java.lang.Thread.run(Thread.java:662)
attempt_201404121146_0002_r_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
attempt_201404121146_0002_r_000001_0: at java.util.AbstractList.iterator(AbstractList.java:273)
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:363)
attempt_201404121146_0002_r_000001_0: at org.apache.hadoop.mapred.Child$3.run(Child.java:158)
14/04/12 12:33:10 INFO mapred.JobClient: map 100% reduce 33%
14/04/12 12:33:12 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000003_0, Status : FAILED
Error: Java heap space
attempt_201404121146_0002_r_000003_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201404121146_0002_r_000003_0: log4j:WARN Please initialize the log4j system properly.
attempt_201404121146_0002_r_000003_0: log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
14/04/12 12:33:15 INFO mapred.JobClient: map 100% reduce 16%
14/04/12 12:33:16 INFO mapred.JobClient: map 100% reduce 18%
14/04/12 12:33:16 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000000_0, Status : FAILED
Error: Java heap space
attempt_201404121146_0002_r_000000_0: Exception in thread "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
attempt_201404121146_0002_r_000000_0: at java.lang.StringCoding.set(StringCoding.java:53)
attempt_201404121146_0002_r_000000_0: at java.lang.StringCoding.decode(StringCoding.java:171)
attempt_201404121146_0002_r_000000_0: at java.lang.String.<init>(String.java:443)
attempt_201404121146_0002_r_000000_0: at java.util.jar.Attributes.read(Attributes.java:401)
attempt_201404121146_0002_r_000000_0: at java.util.jar.Manifest.read(Manifest.java:182)
attempt_201404121146_0002_r_000000_0: at java.util.jar.Manifest.<init>(Manifest.java:52)
attempt_201404121146_0002_r_000000_0: at java.util.jar.JarFile.getManifestFromReference(JarFile.java:167)
attempt_201404121146_0002_r_000000_0: at java.util.jar.JarFile.getManifest(JarFile.java:148)
attempt_201404121146_0002_r_000000_0: at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:696)
attempt_201404121146_0002_r_000000_0: at java.net.URLClassLoader.defineClass(URLClassLoader.java:228)
attempt_201404121146_0002_r_000000_0: at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
attempt_201404121146_0002_r_000000_0: at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
attempt_201404121146_0002_r_000000_0: at java.security.AccessController.doPrivileged(Native Method)
attempt_201404121146_0002_r_000000_0: at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
attempt_201404121146_0002_r_000000_0: at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
attempt_201404121146_0002_r_000000_0: at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
attempt_201404121146_0002_r_000000_0: at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
attempt_201404121146_0002_r_000000_0: at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:400)
attempt_201404121146_0002_r_000000_0: at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
attempt_201404121146_0002_r_000000_0: at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
attempt_201404121146_0002_r_000000_0: at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
attempt_201404121146_0002_r_000000_0: at java.lang.Thread.run(Thread.java:662)
14/04/12 12:33:21 INFO mapred.JobClient: map 100% reduce 20%
I think my program is consuming too much memory and need to be optimized. I even tried to solve this by increasing my java heap space to 1024MB but still I am getting the same error. The dataset I had used was 1.4GB which had 5cr rows with 9 attributes exclusing the row ID. Since my problem is of Big data , testing the code with small data is not a solution. Plz can you suggest me how do i optimise my code so that the memory issue is resolved. Thanks in advance.
Upvotes: 0
Views: 2446
Reputation: 338
Since the option of traversing the iterator twice is not possible and your heap cannot handle the large amount of values stored in a list, I suggest you add an intermediary MapReduce step, giving a total of three MapReduce steps for your job.
My proposition is as follows :
Step 1
Mapper 1 outputs attributeID + "," + value => UserID
Reducer 1 computes the total count for each key (attributeID + "," + value
). First, it outputs the attributeID + "," + value => UserID
as received from Mapper 1. Secondly, it outputs "." + attributeID + "," + value => total_count
. The dot is added as prefix to ensure that all total_counts
arrive first to the next Reducer. This is guaranteed thanks to the sort phase.
Step 2
Mapper 2 does nothing other than output every input it receives.
Reducer 2 is guaranteed to receive the total_counts
first. So as long as it's a row that corresponds to a total_count
, it stores it in a HashMap (attributeID + "," + value => total_count
). So as soon as it starts receiving the other rows, all it has to do is retrieve the corresponding total_count
from the HashMap and output UserID => total_count
.
Note that only one Reducer should be used in this phase, so you have to set mapreduce.job.reduces
to 1. You can reset it to your former value after this step.
Step 3
Same as the second MapReduce step in your initial solution. Computes the average and outputs UserID => average
.
This solution is quite optimistic, as it assumes that your heap can handle your HashMap. Give it a try and see what happens.
Here is a sample code :
public class Trial {
public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
String[] rows = value.toString().split("\r?\n");
for (int i = 0; i < rows.length; i++) {
String[] cols = rows[i].toString().split(",");
String v = cols[0];
for (int j = 1; j < cols.length; j++) {
String k = j + "," + cols[j];
output.collect(new Text(k), new Text(v));
}
}
}
}
public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
int count = 0;
while (values.hasNext()) {
output.collect(key, values.next());
count++;
}
output.collect(new Text("." + key),
new Text(count));
}
}
public static class MapB extends MapReduceBase implements Mapper<Text, Text, Text, Text>
{
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
output.collect(key, value);
}
}
public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
private Map<String, Integer> total_count = new HashMap<String, Integer>();
private Set<String> attributes = new HashSet<String>(); // count the distinct number of attributes
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String rKey = key.toString();
if(rKey.startsWith(".")){
while (values.hasNext()) {
total_count.put(rKey.substring(1), Integer.valueOf(values.next().toString()));
attributes.add(rKey.substring(1).split(",")[0]);
return;
}
}
while (values.hasNext()) {
Text value = values.next();
output.collect(value, new Text(Integer.toString(total_count.get(rKey))));
output.collect(value, new Text("." + attributes.size())); // send the total number of attributes
}
}
}
public static class MapC extends MapReduceBase implements Mapper<Text, Text, Text, Text>
{
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
output.collect(key, value);
}
}
public static class ReduceC extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable>
{
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable>output, Reporter reporter) throws IOException
{
long sum = 0;
int nbAttributes = 0;
while(values.hasNext()){
String value = values.next();
if(value.startsWith(".")){ // check if line corresponds to the total number of attributes
nbAttributes = Integer.parseInt(value.substring(1));
} else{
sum += Integer.parseInt(value);
}
}
output.collect(key, new DoubleWritable(sum / nbAttributes));
}
}
}
Upvotes: 1