Chris Miller
Chris Miller

Reputation: 763

Repeat, Duplicate Records with Avro and Spark?

I have a strange thing happening:

import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils}

val path = "/path/to/data.avro"

val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable])
rdd.take(10).foreach( x => println( x._1.datum() ))

In this situation, I get the right number of records returned, and if I look at the contents of rdd I see the individual records as tuple2's... however, if I println on each one as shown above, I get the same result every time.

Apparently this has to do with Spark keeping a reference to the item its iterating over, so I need to clone the object before I use it. However, if I try to clone it, I get:

rdd.take(10).foreach( x => {
  val clonedDatum = x._1.datum().clone()
  println(clonedDatum.datum())
})

<console>:37: error: method clone in class Object cannot be accessed in org.apache.avro.generic.GenericRecord
 Access to protected method clone not permitted because
 prefix type org.apache.avro.generic.GenericRecord does not conform to
 class $iwC where the access take place
                val clonedDatum = x._1.datum().clone()

So, how can I clone the datum?

Looks like I'm not the only one who ran into this problem: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't figure out how to fix it in my case without hacking away like the person in the linked PR did.

Suggestions?

Upvotes: 0

Views: 1472

Answers (1)

Chris Miller
Chris Miller

Reputation: 763

With some help from the mailing list, I think I somewhat figured it out. From the SparkContext API docs:

'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function.

So, this is how I resolved the issue:

val rdd = sc.newAPIHadoopFile(path, 
    classOf[AvroKeyInputFormat[GenericRecord]],
    classOf[AvroKey[GenericRecord]],
    classOf[NullWritable])
  .map(_._1.datum) // <-- added this

rdd.take(10).foreach(println)

Upvotes: 2

Related Questions