Reputation: 763
I have a strange thing happening:
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils}
val path = "/path/to/data.avro"
val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable])
rdd.take(10).foreach( x => println( x._1.datum() ))
In this situation, I get the right number of records returned, and if I look at the contents of rdd I see the individual records as tuple2's... however, if I println on each one as shown above, I get the same result every time.
Apparently this has to do with Spark keeping a reference to the item its iterating over, so I need to clone the object before I use it. However, if I try to clone it, I get:
rdd.take(10).foreach( x => {
val clonedDatum = x._1.datum().clone()
println(clonedDatum.datum())
})
<console>:37: error: method clone in class Object cannot be accessed in org.apache.avro.generic.GenericRecord
Access to protected method clone not permitted because
prefix type org.apache.avro.generic.GenericRecord does not conform to
class $iwC where the access take place
val clonedDatum = x._1.datum().clone()
So, how can I clone the datum?
Looks like I'm not the only one who ran into this problem: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't figure out how to fix it in my case without hacking away like the person in the linked PR did.
Suggestions?
Upvotes: 0
Views: 1472
Reputation: 763
With some help from the mailing list, I think I somewhat figured it out. From the SparkContext API docs:
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function.
So, this is how I resolved the issue:
val rdd = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
.map(_._1.datum) // <-- added this
rdd.take(10).foreach(println)
Upvotes: 2