matthieu lieber
matthieu lieber

Reputation: 662

How to retrieve Avro data from HDFS?

I've created a JSON data, and an Avro schema for it:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }

and

{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }

I then converted it to Avro as follows:

java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

Put the file on hdfs with this:

hadoop fs -copyFromLocal twitter.avro <path>

And then In Spark CLI I issued the code:

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable

val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)

However when doing:

avroRDD.first

I'm facing the following exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 7.0 (TID 13) had a not serializable result: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

What is a solution to it?

Upvotes: 2

Views: 1678

Answers (1)

eugen
eugen

Reputation: 5916

Spark is trying to ser/de your avro data but it is not "java serializable" (default ser. used in spark).

You have a couple alternatives:

  • extract the generic record from the wrapper and map each record to some serializable structure
  • generate specific record classes and deser to them instead of generic records (you will still need to extract the record from the wrapper)
  • enable kryo serialization (this would work in some cases only)

Note that the records are reused internally, thus if you do for example a rdd.collect you will end up with all records having same values. Mapping the raw input data to something else before doing the collect solves the problem as you are doing a copy.

Upvotes: 3

Related Questions