Reputation: 854
I have a job with mapper PrepareData only which needed for converting text data to SequencialFile with VLongWritable as a key and DoubleArrayWritable as a value.
When I run it over 455000x90 (~384 Mb) data with lines, for example:
13.124,123.12,12.12,... 1.12
23.12,1.5,12.6,... 6.123
...
in local mode it's takes on average:
=> 52-53 seconds on average.
but when I run it in real cluster with this 2 machines (Athlon 64 X2 Dual Core 5600+, 3700+) it's takes 81 seconds in best case.
Job executed with 4 mapper (block size ~96 mb) and 2 reducers.
Cluster powered by Hadoop 0.21.0, configured for jvm reuse.
Mapper:
public class PrepareDataMapper
extends Mapper<LongWritable, Text, VLongWritable, DoubleArrayWritable> {
private int size;
// hint
private DoubleWritable[] doubleArray;
private DoubleArrayWritable mapperOutArray = new DoubleArrayWritable();
private VLongWritable mapOutKey = new VLongWritable();
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
size = conf.getInt("dataDimSize", 0);
doubleArray = new DoubleWritable[size];
for (int i = 0; i < size; i++) {
doubleArray[i] = new DoubleWritable();
}
}
@Override
public void map(
LongWritable key,
Text row,
Context context) throws IOException, InterruptedException {
String[] fields = row.toString().split(",");
for (int i = 0; i < size; i++) {
doubleArray[i].set(Double.valueOf(fields[i]));
}
mapperOutArray.set(doubleArray);
mapOutKey.set(key.get());
context.write(mapOutKey, mapperOutArray);
}
}
DoubleArrayWritable:
public class DoubleArrayWritable extends ArrayWritable {
public DoubleArrayWritable() {
super(DoubleWritable.class);
}
public DoubleArrayWritable(DoubleWritable[] values) {
super(DoubleWritable.class, values);
}
public void set(DoubleWritable[] values) {
super.set(values);
}
public DoubleWritable get(int idx) {
return (DoubleWritable) get()[idx];
}
public double[] getVector(int from, int to) {
int sz = to - from + 1;
double[] vector = new double[sz];
for (int i = from; i <= to; i++) {
vector[i-from] = get(i).get();
}
return vector;
}
}
Upvotes: 0
Views: 791
Reputation: 8088
I can guess that the different is in the job srart-up time. For the local mode it is a few seconds, while for the cluster it is usually dozens of seconds.
To verify this assumption you can put more data and verify that cluster performance became better then single node.
Additional possible cause - you might have not enough mappers to fully utilize your hardware. I would suggest trying number of mappers x2 of number of cores you have.
Upvotes: 2