jbrown
jbrown

Reputation: 7986

How can I read this avro file using spark & scala?

I've seen various spark and avro questions (including How can I load Avros in Spark using the schema on-board the Avro file(s)?), but none of the solutions work for me with the following avro file:

http://www.4shared.com/file/SxnYcdgJce/sample.html

When I try to read the avro file using the solution above, I get errors about it not being serializable (spark java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper).

How can I set up spark 1.1.0 (using scala) to read this sample avro file?

-- update --

I've moved this to the mailing list: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-this-avro-file-using-spark-amp-scala-td19400.html

Upvotes: 0

Views: 7742

Answers (3)

lpiepiora
lpiepiora

Reputation: 13749

I had the same problem when trying to read an Avro file. The reason is that the AvroWrapper is not implementing java.io.Serializable interface.

The solution was to use org.apache.spark.serializer.KryoSerializer.

import org.apache.spark.SparkConf

val cfg = new SparkConf().setAppName("MySparkJob")
cfg.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
cfg.set("spark.kryo.registrator", "com.stackoverflow.Registrator")

However that wasn't enough, as my class, which was in the Avro file, didn't implement Serializable either.

Therefore I had added my own registrator, extending KryoRegistrator, and included chill-avro library.

class Registrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[MyClassInAvroFile])
    kryo.register(classOf[AnotherClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[AnotherClassInAvroFile])
  }
}

Then I was able to read a file like this:

ctx.hadoopFile("/path/to/the/avro/file.avro",
  classOf[AvroInputFormat[MyClassInAvroFile]],
  classOf[AvroWrapper[MyClassInAvroFile]],
  classOf[NullWritable]
).map(_._1.datum())

Upvotes: 4

jbrown
jbrown

Reputation: 7986

My solution is to use spark 1.2 and sparkSQL as linked to in my question:

val person = sqlContext.avroFile("/tmp/person.avro")

Upvotes: 1

Tony Z
Tony Z

Reputation: 194

Edit the serializer to be kryo should do the trick.

One way is to comment out the line in /etc/spark/conf/spark-defaults.conf:

spark.serializer org.apache.spark.serializer.KryoSerializer

Upvotes: 2

Related Questions