Anup
Anup

Reputation: 955

reading/writing avro file in spark core using java

I need to access avro file data in a program written in java on spark core. I can use the MapReduce InputFormat class but it gives me a tuple containing each line of file as a key. It's very hard to parse it as i am not using scala.

JavaPairRDD<AvroKey<GenericRecord>, AvroValue> avroRDD = sc.newAPIHadoopFile("dataset/testfile.avro", AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,new Configuration()); 

Is there any utility class or jar available which i can use to map avro data directly into java classes. E.g. the codehaus.jackson package has a provision for mapping json to java class.

Otherwise is there any other method to easily parse fields present in avro file to java classes or RDDs.

Upvotes: 0

Views: 644

Answers (1)

makagonov
makagonov

Reputation: 1662

Consider that your avro file contains serialized pairs, with key being a String, and value being an avro class. Then you could have a generic static function of some Utils class that looks like this:

public class Utils {

  public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
    JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
    return records.keys()
        .map(x -> (GenericRecord) x.datum())
        .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
  }
}

And then you could use the method this way:

JavaPairRDD<String, YourAvroClassName> records = Utils.<YourAvroClassName>loadAvroFile(sc, inputDir);

You might also need to use KryoSerializer and register your custom KryoRegistrator:

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "com.test.avro.MyKryoRegistrator");

And the registrator class would look this way:

public class MyKryoRegistrator implements KryoRegistrator {

  public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer {
    Class<T> type;
    public SpecificInstanceCollectionSerializer(Class<T> type) {
      this.type = type;
    }

    @Override
    protected Collection create(Kryo kryo, Input input, Class<Collection> type) {
      return kryo.newInstance(this.type);
    }

    @Override
    protected Collection createCopy(Kryo kryo, Collection original) {
      return kryo.newInstance(this.type);
    }
  }


  Logger logger = LoggerFactory.getLogger(this.getClass());

  @Override
  public void registerClasses(Kryo kryo) {
    // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
    // because Kryo is not able to serialize them properly, we use this serializer for them
    kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer<>(ArrayList.class));
    kryo.register(YourAvroClassName.class);
  }
}

Upvotes: 1

Related Questions