Reputation: 511
I am working on spark job in Scala that read data from avro files. Beginning is quite simple:
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
But later it is not elegant as I need to operate on tuples ie.
avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
...
I was thinking about using Map instead of tuple but if I will have several different type i.e. Long and String it will result in Map[String,Any]
and casting on every operation.
i.e
avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
...
Alternative solution is to use case classes and wrap values into it but sometimes it could lead to lot of case classes definitions of i.e:
case class TestClass(value: Long, level:Double, size:Long, category:String)
avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
map(x => (asDate(x._1),x._2)).
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
...
I am wondering whether there is better way for handling generic Records in such situation - way in which you do not need to constantly cast to specific type and you could operate on names of fields. Something like named tuple will do the job.
Do you know better way?
How do you handle such cases?
Upvotes: 1
Views: 962
Reputation: 170723
With pattern matching:
map { case (value, startTime, level, size, category) =>
(asDate(startTime), (value,level,size,category))
}.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) =>
(value1+value2, level1+level2, size1+size2, category2)
}.map { case (startTime, (value, level, size, category)) =>
List(startTime, value, level, size, category).mkString(","))
}
If you have some tuples which get reused often, use case classes for them.
Upvotes: 2