Monami Sen
Monami Sen

Reputation: 11

Getting java heap space error while running a mapreduce code for large dataset

I am a beginner of MapReduce programming and have coded the following Java program for running in a Hadoop cluster comprising 1 NameNode and 3 DatanNodes :

package trial;

import java.io.IOException;
import java.util.*;
import java.lang.Iterable;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


public class Trial 
{

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] rows = value.toString().split("\r?\n");          
       for(int i=0;i<rows.length;i++)
       {
           String[] cols = rows[i].toString().split(",");

           String v=cols[0];
           for(int j=1;j<cols.length;j++)
           {
               String k =j+","+cols[j];
               output.collect(new Text(k),new Text(v));
           }
       }


   }
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
            int count =0;                
            String[] attr = key.toString().split(",");      
            List<String> list = new ArrayList<String>();

           while(values.hasNext())               
            {
                list.add((values.next()).toString());
                count++;

            }

           String v=Integer.toString(count);
           for(String s:list)
           { 
               output.collect(new Text(s),new Text(v));
           }

        }   

}




public static void main(String[] args) throws IOException
{
    JobConf conf1 = new JobConf(Trial.class);
    conf1.setJobName("Trial");

    conf1.setOutputKeyClass(Text.class);
    conf1.setOutputValueClass(Text.class);

    conf1.setMapperClass(MapA.class);
    //conf.setCombinerClass(Combine.class);
    conf1.setReducerClass(ReduceA.class);

    conf1.setInputFormat(TextInputFormat.class);
    conf1.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf1, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf1, new Path(args[1]));

    JobClient.runJob(conf1);

    JobConf conf2 = new JobConf(Final.class);
    conf2.setJobName("Final");

    conf2.setOutputKeyClass(Text.class);
    conf2.setOutputValueClass(Text.class);

    conf2.setMapperClass(Final.MapB.class);
    //conf.setCombinerClass(Combine.class);
    conf2.setReducerClass(Final.ReduceB.class);

    conf2.setInputFormat(TextInputFormat.class);
    conf2.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf2, new Path(args[1]));
    FileOutputFormat.setOutputPath(conf2, new Path(args[2]));

    JobClient.runJob(conf2);


  }


  }  

class Final
{

public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] r = value.toString().split("\r?\n");
       String[] p1= new String[5];

       for(int i=0;i<r.length;i++)
       {
           p1 = r[i].split("\t");               
           output.collect(new Text(p1[0]),new Text(p1[1]));
       }

   }
}

 public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
           int sum=0;
           while(values.hasNext())
           {
               String s = (values.next()).toString();
               int c=Integer.parseInt(s);
               sum+=c;
           }
           float avf =(float)sum/3;
           String count=Float.toString(avf);
           output.collect(key,new Text(count));
        }   

}

}

The program is run on a dataset like this:

ID1,1,2,3 
ID1,1,3,2
ID3,2,2,3

Each row has an ID followed by 3 comma-separated attributes. My problem is to find the frequency of the value of each attribute(along the column not across the row if the dataset is seen as a 2-D array) of each ID and then sum up the frequencies of each attribute for an ID and find the average.Thus for the above the dataset:

ID1 : 2+2+2/3=2
ID2 : 2+1+1/3=1.33
ID3 : 1+2+2/3=1.67

The above code is working well with small datasets like 200-500MB. But for datasets above 1GB I am getting an error like this:

 map 100% reduce 50%
       14/04/12 12:33:06 INFO mapred.JobClient: Task Id :  attempt_201404121146_0002_r_000001_0, Status : FAILED
      Error: Java heap space
      attempt_201404121146_0002_r_000001_0: Exception in thread  "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:397)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
     attempt_201404121146_0002_r_000001_0:     at java.lang.Thread.run(Thread.java:662)
     attempt_201404121146_0002_r_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000001_0:     at java.util.AbstractList.iterator(AbstractList.java:273)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:363)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.Child$3.run(Child.java:158)
     14/04/12 12:33:10 INFO mapred.JobClient:  map 100% reduce 33%
     14/04/12 12:33:12 INFO mapred.JobClient: Task Id :    attempt_201404121146_0002_r_000003_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000003_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
     attempt_201404121146_0002_r_000003_0: log4j:WARN Please initialize the log4j system properly.
      attempt_201404121146_0002_r_000003_0: log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
     14/04/12 12:33:15 INFO mapred.JobClient:  map 100% reduce 16%
     14/04/12 12:33:16 INFO mapred.JobClient:  map 100% reduce 18%
     14/04/12 12:33:16 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000000_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000000_0: Exception in thread "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.set(StringCoding.java:53)
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.decode(StringCoding.java:171)
     attempt_201404121146_0002_r_000000_0:     at java.lang.String.<init>(String.java:443)
     attempt_201404121146_0002_r_000000_0:     at java.util.jar.Attributes.read(Attributes.java:401)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.read(Manifest.java:182)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.<init>(Manifest.java:52)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifestFromReference(JarFile.java:167)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifest(JarFile.java:148)
       attempt_201404121146_0002_r_000000_0:     at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:696)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.defineClass(URLClassLoader.java:228)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
       attempt_201404121146_0002_r_000000_0:     at      java.security.AccessController.doPrivileged(Native Method)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
     attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
     attempt_201404121146_0002_r_000000_0:     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
   attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:400)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
  attempt_201404121146_0002_r_000000_0:     at java.lang.Thread.run(Thread.java:662)
 14/04/12 12:33:21 INFO mapred.JobClient:  map 100% reduce 20%

I think my program is consuming too much memory and need to be optimized. I even tried to solve this by increasing my java heap space to 1024MB but still I am getting the same error. The dataset I had used was 1.4GB which had 5cr rows with 9 attributes exclusing the row ID. Since my problem is of Big data , testing the code with small data is not a solution. Plz can you suggest me how do i optimise my code so that the memory issue is resolved. Thanks in advance.

Upvotes: 0

Views: 2446

Answers (1)

blazy
blazy

Reputation: 338

Since the option of traversing the iterator twice is not possible and your heap cannot handle the large amount of values stored in a list, I suggest you add an intermediary MapReduce step, giving a total of three MapReduce steps for your job.

My proposition is as follows :

  • Step 1
    Mapper 1 outputs attributeID + "," + value => UserID
    Reducer 1 computes the total count for each key (attributeID + "," + value). First, it outputs the attributeID + "," + value => UserID as received from Mapper 1. Secondly, it outputs "." + attributeID + "," + value => total_count. The dot is added as prefix to ensure that all total_counts arrive first to the next Reducer. This is guaranteed thanks to the sort phase.

  • Step 2
    Mapper 2 does nothing other than output every input it receives.
    Reducer 2 is guaranteed to receive the total_counts first. So as long as it's a row that corresponds to a total_count, it stores it in a HashMap (attributeID + "," + value => total_count). So as soon as it starts receiving the other rows, all it has to do is retrieve the corresponding total_count from the HashMap and output UserID => total_count.
    Note that only one Reducer should be used in this phase, so you have to set mapreduce.job.reduces to 1. You can reset it to your former value after this step.

  • Step 3
    Same as the second MapReduce step in your initial solution. Computes the average and outputs UserID => average.

This solution is quite optimistic, as it assumes that your heap can handle your HashMap. Give it a try and see what happens.

Here is a sample code :

public class Trial {

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
        String[] rows = value.toString().split("\r?\n");
        for (int i = 0; i < rows.length; i++) {
            String[] cols = rows[i].toString().split(",");

            String v = cols[0];
            for (int j = 1; j < cols.length; j++) {
                String k = j + "," + cols[j];
                output.collect(new Text(k), new Text(v));
            }
        }
}
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        int count = 0;

        while (values.hasNext()) {
            output.collect(key, values.next());
            count++;
        }
        output.collect(new Text("." + key),
                new Text(count));
    }  

}


public static class MapB extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
    output.collect(key, value);
}
}


public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

private Map<String, Integer> total_count = new HashMap<String, Integer>();
private Set<String> attributes = new HashSet<String>(); // count the distinct number of attributes

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {

        String rKey = key.toString();
        if(rKey.startsWith(".")){
            while (values.hasNext()) {
                total_count.put(rKey.substring(1), Integer.valueOf(values.next().toString()));
                attributes.add(rKey.substring(1).split(",")[0]);
                return;
            }
        }
        while (values.hasNext()) {
            Text value = values.next();
            output.collect(value, new Text(Integer.toString(total_count.get(rKey))));
            output.collect(value, new Text("." + attributes.size())); // send the total number of attributes
        }
    }  
}


public static class MapC extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
        output.collect(key, value);
    }
}

public static class ReduceC extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable>
{

    @Override
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable>output, Reporter reporter) throws IOException 
    {
       long sum = 0;
       int nbAttributes = 0;
       while(values.hasNext()){
           String value = values.next();
           if(value.startsWith(".")){ // check if line corresponds to the total number of attributes
               nbAttributes = Integer.parseInt(value.substring(1)); 
           } else{
               sum += Integer.parseInt(value);   
           }
       }
       output.collect(key, new DoubleWritable(sum / nbAttributes));
    }   
}

} 

Upvotes: 1

Related Questions