aa8y
aa8y

Reputation: 3942

Normalizing using MapReduce

There is this sample record, 100,1:2:3

Which I want to normalize as,
100,1
100,2
100,3

A colleague of mine wrote a pig script to achieve this and my MapReduce code took more time. I was using the default TextInputformat before. But to improve performance, I decided to write a custom Input format class, with a custom RecordReader. Taking the LineRecordReader class as reference, I tried to write the following code.

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import com.normalize.util.Splitter;

public class NormalRecordReader extends RecordReader<Text, Text> {

    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private Text key = null;
    private Text value = null;
    private Text line = null;

    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

        start = split.getStart();
        end = start + split.getLength();

        final Path file = split.getPath();

        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());

        in = new LineReader(fileIn, job);
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        int newSize = 0;
        if (line == null) {
            line = new Text();
        }

        while (pos < end) {
            newSize = in.readLine(line);
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

            // line too long. try again
            System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        Splitter splitter = new Splitter(line.toString(), ",");
        List<String> split = splitter.split();

        if (key == null) {
            key = new Text();
        }
        key.set(split.get(0));

        if (value == null) {
            value = new Text();
        }
        value.set(split.get(1));

        if (newSize == 0) {
            key = null;
            value = null;
            return false;

        } else {
            return true;
        }
    }

    @Override
    public Text getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    /**
     * Get the progress within the split
     */
    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }

    public synchronized void close() throws IOException {
        if (in != null) {
            in.close(); 
        }
    }
}

Though this works, but I haven't seen any performance improvement. Here I am breaking the record at "," and setting the 100 as key and 1,2,3 as value. I only call the mapper which does the following:

public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {

    try {
        Splitter splitter = new Splitter(value.toString(), ":");
        List<String> splits = splitter.split();

        for (String split : splits) {
            context.write(key, new Text(split));
        }

    } catch (IndexOutOfBoundsException ibe) {
        System.err.println(value + " is malformed.");
    }
}

The splitter class is used to split the data, as I found String's splitter to be slower. The method is:

public List<String> split() {

    List<String> splitData = new ArrayList<String>();
    int beginIndex = 0, endIndex = 0;

    while(true) {

        endIndex = dataToSplit.indexOf(delim, beginIndex);
        if(endIndex == -1) {
            splitData.add(dataToSplit.substring(beginIndex));
            break;
        }

        splitData.add(dataToSplit.substring(beginIndex, endIndex));
        beginIndex = endIndex + delimLength;
    }

    return splitData;
}

Can the code be improved in any way?

Upvotes: 0

Views: 1407

Answers (1)

Charles Menguy
Charles Menguy

Reputation: 41428

Let me summarize here what I think you can improve instead of in the comments:

  • As explained, currently you are creating a Text object several times per record (number of times will be equal to your number of tokens). While it may not matter too much for small input, this can be a big deal for decently sized jobs. To fix that, do the following:

    private final Text text = new Text();
    
    public void map(Text key, Text value, Context context) {
        ....
        for (String split : splits) {
            text.set(split);
            context.write(key, text);
        }
    }
    
  • For your splitting, what you're doing right now is for every record allocating a new array, populating this array, and then iterating over this array to write your output. Effectively you don't really need an array in this case since you're not maintaining any state. Using the implementation of the split method you provided, you only need to make one pass on the data:

    public void map(Text key, Text value, Context context) {
        String dataToSplit = value.toString();
        String delim = ":";
    
        int beginIndex = 0;
        int endIndex = 0;
    
        while(true) {
            endIndex = dataToSplit.indexOf(delim, beginIndex);
            if(endIndex == -1) {
                text.set(dataToSplit.substring(beginIndex));
                context.write(key, text);
                break;
            }
    
            text.set(dataToSplit.substring(beginIndex, endIndex));
            context.write(key, text);
            beginIndex = endIndex + delim.length();
        }
    }
    
  • I don't really see why you write your own InputFormat, it seems that KeyValueTextInputFormat is exactly what you need and has probably been already optimized. Here is how you use it:

    conf.set("key.value.separator.in.input.line", ",");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    
  • Based on your example, the key for each record seems to be an integer. If that's always the case, then using a Text as your mapper input key is not optimal and it should be an IntWritable or maybe even a ByteWritable depending on what's in your data.

  • Similarly, you want want to use an IntWritable or ByteWritable as your mapper output key and output value.

Also, if you want some meaningful benchmark, you should test on a bigger dataset, like a few Gbs if possible. 1 minute tests are not really meaningful, especially in the context of distributed systems. 1 job may run quicker than another one on a small input, but the trend may be reverted for bigger inputs.

That being said, you should also know that Pig does a lot of optimizations behind the hood when translating to Map/Reduce, so I'm not too surprised that it runs faster than your Java Map/Reduce code and I've seen that in the past. Try the optimizations I suggested, if it's still not fast enough here is a link on profiling your Map/Reduce jobs with a few more useful tricks (especially tip 7 on profiling is something I've found useful).

Upvotes: 1

Related Questions