Tom Depke
Tom Depke

Reputation: 83

In Hadoop is it possible to specify the record delimiter for TextOutputFormat

I see a mechanism to override the delimiter between key and value using mapreduce.textoutputformat.separator (using 1.03 of api). But I want to be able to control the separator between records. FYI I am using ArrayWritable as the value and NullWritable as the key.

Upvotes: 5

Views: 2023

Answers (2)

Charles Menguy
Charles Menguy

Reputation: 41428

As far as I know this is not possible because TextOutputFormat uses toString() to get the text representation of the values, and in case of ArrayWritable it doesn't implement toString(), so you would probably end up with the default Object.toString() if you were to write an ArrayWritable to the output of your Reducer. Or maybe you meant to change the separator between lines, in which case it's the same issue as TextOutputFormat uses a \n character by default as pointed by climbage.

That being said, you could do it by implementing a custom output format where you would define your own RecordWriter and have a custom configuration property in the getRecordWriter method. Here is a quick & dirty implementation of such a class (not tested) which should do what you need and let you control the separator for an ArrayWritable via the property mapred.arraywritable.separator and the separator between lines with mapred.line.separator :

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> {

protected static class ArrayLineRecordWriter<K, V> extends
        LineRecordWriter<K, V> {


    private static final String utf8 = "UTF-8";
    private final byte[] arraySeparator;
    private final byte[] keyValueSeparator;
    private final byte[] lineSeparator;

    public ArrayLineRecordWriter(DataOutputStream out,
            String keyValueSeparator, String arraySeparator, String lineSeparator) {
        super(out);
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
            this.arraySeparator = arraySeparator.getBytes(utf8);
            this.lineSeparator = lineSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8
                    + " encoding");
        }
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else if (o instanceof ArrayWritable) {
            ArrayWritable awo = (ArrayWritable) o;
            for (String wrt : awo.toStrings()) {
                out.write(wrt.toString().getBytes(utf8));
                out.write(arraySeparator);
            }
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(lineSeparator);
    }
}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
        throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator = conf.get(
            "mapred.textoutputformat.separator", "\t");
    String arraySeparator = conf.get("mapred.arraywritable.separator", "|");
    String lineSeparator = conf.get("mapred.line.separator");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
                job, GzipCodec.class);
        codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
                conf);
        extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
        FSDataOutputStream fileOut = fs.create(file, false);
        return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator,
                arraySeparator, lineSeparator);
    } else {
        FSDataOutputStream fileOut = fs.create(file, false);
        return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
                codec.createOutputStream(fileOut)), keyValueSeparator,
                arraySeparator, lineSeparator);
    }
}
}

Upvotes: 6

Mike Park
Mike Park

Reputation: 10941

Not without writing your own implementation of TextOuputFormat.

TextOutputFormat uses LineRecordWriter to write records. This writer has the record separator hardcoded to \n.

static {
  try {
    newline = "\n".getBytes(utf8);
  } catch (UnsupportedEncodingException uee) {
    throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  }
}

and no option to change it...

public synchronized void write(K key, V value)
  throws IOException {

  boolean nullKey = key == null || key instanceof NullWritable;
  boolean nullValue = value == null || value instanceof NullWritable;
  if (nullKey && nullValue) {
    return;
  }
  if (!nullKey) {
    writeObject(key);
  }
  if (!(nullKey || nullValue)) {
    out.write(keyValueSeparator);
  }
  if (!nullValue) {
    writeObject(value);
  }
  out.write(newline);
}

Fortunately, it should be relatively easy to roll your own with a slight change.

Upvotes: 1

Related Questions