M.K
M.K

Reputation: 403

How to set multiple Avro schemas with AvroParquetOutputFormat?

In my MapReduce job, Im using AvroParquetOutputFormat to write to Parquet files using Avro schema.

The application logic requires multiple types of files getting created by Reducer and each file has its own Avro schema.

The class AvroParquetOutputFormat has a static method setSchema() to set Avro schema of output. Looking at the code, AvroParquetOutputFormat uses AvroWriteSupport.setSchema() which again is a static implementation.

Without extending AvroWriteSupport and hacking the logic, is there a simpler way to achieve multiple Avro schema output from AvroParquetOutputFormat in a single MR job?

Any pointers/inputs highly appreciated.

Thanks & Regards

MK

Upvotes: 3

Views: 1576

Answers (1)

Mohitt
Mohitt

Reputation: 2977

It may be quite late to answer, but I have also faced this issue and came up with a solution.

First, There is no support like 'MultipleAvroParquetOutputFormat' inbuilt in parquet-mr. But to achieve a similar behavior I used MultipleOutputs.

For a map-only kind of job, put your mapper like this:

public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{

    protected  KafkaAvroDecoder deserializer;
    protected String outputPath = "";

    // Using MultipleOutputs to write custom named files
    protected MultipleOutputs<Void, GenericRecord> mos;

    public void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();           
        outputPath = conf.get(FileOutputFormat.OUTDIR);
        mos = new MultipleOutputs<Void, GenericRecord>(context);
    }

    public void map(LongWritable ln, BytesWritable value, Context context){

        try {
            GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
            AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
            Schema schema = record.getSchema();
            String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm 
            mos.write( (Void) null, record, mergeEventsPath);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

}

This will create a new RecordWriter for each schema and creates a new parquet file, appended with the schema name, for example, schema1-r-0000.parquet.

This will also create the default part-r-0000x.parquet files based on schema set in the driver. To avoid this, use LazyOutputFormat like:

LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);

Hope this helps.

Upvotes: 2

Related Questions