cliffycheng
cliffycheng

Reputation: 381

NLineInputFormat has no effect

I am using Hadoop 0.20.2, and am using the old API. I'm trying to send chunks of data to mappers as opposed to sending one line at a time (the data covers multiple lines). I've attempted to us the NLineInputFormat to set how many lines to get at once, but the mapper is still receiving only 1 line at a time. I'm pretty sure that I have the right code. Are there any reasons why this would fail to work?

For your reference,

JobConf conf = new JobConf(WordCount.class);

conf.setInt("mapred.line.input.format.linespermap", 2);

conf.setInputFormat(NLineInputFormat.class);

Basically, I'm using the sample code from http://hadoop.apache.org/common/docs/r0.20.2/mapred_tutorial.html#Example%3A+WordCount+v1.0, only changing the TextInputFormat.

Thanks in advance

Upvotes: 3

Views: 3372

Answers (2)

Matt Fortier
Matt Fortier

Reputation: 1223

I solved this problem recently by simply creating my own InputFormat that overrides NLineInputFormat and implements a custom MultiLineRecordReader instead of the default LineReader.

I chose to extend NLineInputFormat because I wanted to have the same guarantee of having exactly N line(s) per split.

This record reader is taken almost as is from http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

The only things I modified is the property for maxLineLength that now uses the new API, and the value for NLINESTOPROCESS that gets read from NLineInputFormat's setNumLinesPerSplit() insead of being hardcoded (for more flexibility).

Here is the result:

public class MultiLineInputFormat extends NLineInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
        context.setStatus(genericSplit.toString());
        return new MultiLineRecordReader();
    }

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
        private int NLINESTOPROCESS;
        private LineReader in;
        private LongWritable key;
        private Text value = new Text();
        private long start =0;
        private long end =0;
        private long pos =0;
        private int maxLineLength;

        @Override
        public void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (start == end) {
                return 0.0f;
            }
            else {
                return Math.min(1.0f, (pos - start) / (float)(end - start));
            }
        }

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
            NLINESTOPROCESS = getNumLinesPerSplit(context);
            FileSplit split = (FileSplit) genericSplit;
            final Path file = split.getPath();
            Configuration conf = context.getConfiguration();
            this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
            FileSystem fs = file.getFileSystem(conf);
            start = split.getStart();
            end= start + split.getLength();
            boolean skipFirstLine = false;
            FSDataInputStream filein = fs.open(split.getPath());

            if (start != 0){
                skipFirstLine = true;
                --start;
                filein.seek(start);
            }
            in = new LineReader(filein,conf);
            if(skipFirstLine){
                start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            value.clear();
            final Text endline = new Text("\n");
            int newSize = 0;
            for(int i=0;i<NLINESTOPROCESS;i++){
                Text v = new Text();
                while (pos < end) {
                    newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                    value.append(v.getBytes(),0, v.getLength());
                    value.append(endline.getBytes(),0, endline.getLength());
                    if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
                }
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    }

}

Upvotes: 0

Chris White
Chris White

Reputation: 30089

NLineInputFormat is designed to ensure that mappers all receive the same number of input records (except the final part of the split for each file).

So by changing the input property to 2, each mapper should (at maximum) receive 2 input pairs, not 2 input lines at a time (which is what i think you are looking for).

You should be able to confirm this by looking at the counters for each map task, "Map input records" which should be reporting 2 for most of your mappers

Upvotes: 4

Related Questions