Reputation: 3942
There is this sample record, 100,1:2:3
Which I want to normalize as,
100,1
100,2
100,3
A colleague of mine wrote a pig script to achieve this and my MapReduce code took more time. I was using the default TextInputformat before. But to improve performance, I decided to write a custom Input format class, with a custom RecordReader. Taking the LineRecordReader class as reference, I tried to write the following code.
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
import com.normalize.util.Splitter;
public class NormalRecordReader extends RecordReader<Text, Text> {
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private Text line = null;
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
in = new LineReader(fileIn, job);
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
int newSize = 0;
if (line == null) {
line = new Text();
}
while (pos < end) {
newSize = in.readLine(line);
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
Splitter splitter = new Splitter(line.toString(), ",");
List<String> split = splitter.split();
if (key == null) {
key = new Text();
}
key.set(split.get(0));
if (value == null) {
value = new Text();
}
value.set(split.get(1));
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
Though this works, but I haven't seen any performance improvement. Here I am breaking the record at "," and setting the 100 as key and 1,2,3 as value. I only call the mapper which does the following:
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
try {
Splitter splitter = new Splitter(value.toString(), ":");
List<String> splits = splitter.split();
for (String split : splits) {
context.write(key, new Text(split));
}
} catch (IndexOutOfBoundsException ibe) {
System.err.println(value + " is malformed.");
}
}
The splitter class is used to split the data, as I found String's splitter to be slower. The method is:
public List<String> split() {
List<String> splitData = new ArrayList<String>();
int beginIndex = 0, endIndex = 0;
while(true) {
endIndex = dataToSplit.indexOf(delim, beginIndex);
if(endIndex == -1) {
splitData.add(dataToSplit.substring(beginIndex));
break;
}
splitData.add(dataToSplit.substring(beginIndex, endIndex));
beginIndex = endIndex + delimLength;
}
return splitData;
}
Can the code be improved in any way?
Upvotes: 0
Views: 1407
Reputation: 41428
Let me summarize here what I think you can improve instead of in the comments:
As explained, currently you are creating a Text
object several times per record (number of times will be equal to your number of tokens). While it may not matter too much for small input, this can be a big deal for decently sized jobs. To fix that, do the following:
private final Text text = new Text();
public void map(Text key, Text value, Context context) {
....
for (String split : splits) {
text.set(split);
context.write(key, text);
}
}
For your splitting, what you're doing right now is for every record allocating a new array, populating this array, and then iterating over this array to write your output. Effectively you don't really need an array in this case since you're not maintaining any state. Using the implementation of the split
method you provided, you only need to make one pass on the data:
public void map(Text key, Text value, Context context) {
String dataToSplit = value.toString();
String delim = ":";
int beginIndex = 0;
int endIndex = 0;
while(true) {
endIndex = dataToSplit.indexOf(delim, beginIndex);
if(endIndex == -1) {
text.set(dataToSplit.substring(beginIndex));
context.write(key, text);
break;
}
text.set(dataToSplit.substring(beginIndex, endIndex));
context.write(key, text);
beginIndex = endIndex + delim.length();
}
}
I don't really see why you write your own InputFormat
, it seems that KeyValueTextInputFormat
is exactly what you need and has probably been already optimized. Here is how you use it:
conf.set("key.value.separator.in.input.line", ",");
job.setInputFormatClass(KeyValueTextInputFormat.class);
Based on your example, the key for each record seems to be an integer. If that's always the case, then using a Text
as your mapper input key is not optimal and it should be an IntWritable
or maybe even a ByteWritable
depending on what's in your data.
Similarly, you want want to use an IntWritable
or ByteWritable
as your mapper output key and output value.
Also, if you want some meaningful benchmark, you should test on a bigger dataset, like a few Gbs if possible. 1 minute tests are not really meaningful, especially in the context of distributed systems. 1 job may run quicker than another one on a small input, but the trend may be reverted for bigger inputs.
That being said, you should also know that Pig does a lot of optimizations behind the hood when translating to Map/Reduce, so I'm not too surprised that it runs faster than your Java Map/Reduce code and I've seen that in the past. Try the optimizations I suggested, if it's still not fast enough here is a link on profiling your Map/Reduce jobs with a few more useful tricks (especially tip 7 on profiling is something I've found useful).
Upvotes: 1