Reputation: 662
I've created a JSON data, and an Avro schema for it:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
and
{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }
I then converted it to Avro as follows:
java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
Put the file on hdfs with this:
hadoop fs -copyFromLocal twitter.avro <path>
And then In Spark CLI I issued the code:
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
However when doing:
avroRDD.first
I'm facing the following exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 7.0 (TID 13) had a not serializable result: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
What is a solution to it?
Upvotes: 2
Views: 1678
Reputation: 5916
Spark is trying to ser/de your avro data but it is not "java serializable" (default ser. used in spark).
You have a couple alternatives:
Note that the records are reused internally, thus if you do for example a rdd.collect you will end up with all records having same values. Mapping the raw input data to something else before doing the collect solves the problem as you are doing a copy.
Upvotes: 3