Reputation: 2661
I've added following code:
var counters: Map[String, Int] = Map()
val results = rdd.filter(l => l.contains("xyz")).map(l => mapEvent(l)).filter(r => r.isDefined).map (
i => {
val date = i.get.getDateTime.toString.substring(0, 10)
counters = counters.updated(date, counters.getOrElse(date, 0) + 1)
}
)
I want to get counts for different dates in the RDD in one single iteration. But when I run this I get message saying:
No implicits found for parameters evidence$6: Encoder[Unit]
So I added this line:
implicit val myEncoder: Encoder[Unit] = org.apache.spark.sql.Encoders.kryo[Unit]
But then I get this error.
Exception in thread "main" java.lang.ExceptionInInitializerError
at com.xyz.SparkBatchJob.main(SparkBatchJob.scala)
Caused by: java.lang.UnsupportedOperationException: Primitive types are not supported.
at org.apache.spark.sql.Encoders$.genericSerializer(Encoders.scala:200)
at org.apache.spark.sql.Encoders$.kryo(Encoders.scala:152)
How do I fix this? OR Is there a better way to get the counts I want in a single iteration (O(N) time)?
Upvotes: 0
Views: 526
Reputation: 4133
A Spark RDD is a representation of a distributed collection. When you apply a map function to an RDD, the function that you use to manipulate the collection is going to be executed across the cluster so there is no sense in mutating a variable created out of the scope of the map function.
In your code, the problem is because you don´t return any value, instead you are trying to mutate a structure and for that reason the compiler infers that the new created RDD after the transformation is a RDD[Unit].
If you need to create a Map as a result of a Spark action you must create a pairRDD and then apply the reduce operation.
Include the type of the rdd and the mapEvent function to see how it could be done.
Spark builds a DAG with the transformation and the action, it does not process the data twice.
Upvotes: 1