Reputation: 1266
I have a task which writes avro output in multiple directories organized by few fields of the input records.
For example : Process records of countries across years and write in a directory structure of country/year eg: outputs/usa/2015/outputs_usa_2015.avro outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
What output commiter would the below code use to write the output.Is it not safe to be used with speculative execution? With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
In this post Hadoop Reducer: How can I output to multiple directories using speculative execution? It is suggested to use a custom output committer
The below code from hadoop AvroMultipleOutputs does not state any problem with speculative execution
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
Neither does the write method document any issues if baseoutput path is outside the job directory
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
Is there a real issue with AvroMultipleOutputs (an other outputs) with speculative execution when writing outside the job directory? If,then how do i override AvroMultipleOutputs to have it's own output committer.I can't see any outputformat inside AvroMultipleOutputs whose output committer it uses
Upvotes: 7
Views: 741
Reputation: 12207
When you add a named output to AvroMultipleOutputs
, it will call either AvroKeyOutputFormat.getRecordWriter()
or AvroKeyValueOutputFormat.getRecordWriter()
, which call AvroOutputFormatBase.getAvroFileOutputStream()
, whose content is
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}
And AvroOutputFormatBase
extends FileOutputFormat
(the getOutputCommitter()
in the above method is in fact a call to FileOutputFormat.getOutputCommitter()
. Hence, AvroMultipleOutputs
should have the same constraints as MultipleOutputs
.
Upvotes: 0
Reputation: 4982
AvroMultipleOutputs
will use the OutputFormat
which you have registered to Job configurations while adding named output e.g using addNamedOutput
API from AvroMultipleOutputs
(e.g. AvroKeyValueOutputFormat
).
With AvroMultipleOutputs
, you might not be able to use speculative task execution feature. Even overriding it either would not help or would not be simple.
Instead you should write your own OutputFormat
(most probably extending one of the available Avro output formats e.g. AvroKeyValueOutputFormat
), and override/implement its getRecordWriter
API, where it would return one RecordWriter
instance say MainRecordWriter
(just for reference).
This MainRecordWriter
would maintain a map of RecordWriter
(e.g. AvroKeyValueRecordWriter
) instances. Each of these RecordWriter
instances would belong to one of the output file. In write
API of MainRecordWriter
, you would get the actual RecordWriter
instance from the map (based on the record you are going to write), and write the record using this record writer. So MainRecordWriter
would be just working as a wrapper over multiple RecordWriter
instances.
For some similar implementation, you might like to study the code of MultiStorage class from piggybank
library.
Upvotes: 1