Reputation: 595
I am trying to save an rdd to a file in avro format. This is how my code looks like:
val output = s"/test/avro/${date.toString(dayFormat)}"
rmr(output)//deleteing the path
rdd.coalesce(64).saveAsNewAPIHadoopFile(
output,
classOf[org.apache.hadoop.io.NullWritable],
classOf[PageViewEvent],
classOf[AvroKeyValueOutputFormat[org.apache.hadoop.io.NullWritable,PageViewEvent]],
spark.hadoopConfiguration)
}
When I run this I get an error saying:
Unsupported input type PageViewEvent
The type of the rdd is RDD[(Null,PageViewEvent)]. Can someone explain my what I am doing wrong? Thanks in advance
Upvotes: 4
Views: 6527
Reputation: 595
So I managed to find a 'workaround'.
val job = new Job(spark.hadoopConfiguration)
AvroJob.setOutputKeySchema(job, PageViewEvent.SCHEMA$)
val output = s"/avro/${date.toString(dayFormat)}"
rmr(output)
rdd.coalesce(64).map(x => (new AvroKey(x._1), x._2))
.saveAsNewAPIHadoopFile(
output,
classOf[PageViewEvent],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[PageViewEvent]],
job.getConfiguration)
this works fine. I don't try to use AvroKeyValueOutputFormat anymore. But I think now i would be able to. The key change was to use AvroKey and to set OutputKeySchema.
Upvotes: 2