DilTeam
DilTeam

Reputation: 2661

Scala: java.lang.UnsupportedOperationException: Primitive types are not supported

I've added following code:

var counters: Map[String, Int] = Map()
val results = rdd.filter(l => l.contains("xyz")).map(l => mapEvent(l)).filter(r => r.isDefined).map (
          i => {
            val date = i.get.getDateTime.toString.substring(0, 10)
            counters = counters.updated(date, counters.getOrElse(date, 0) + 1)
          }
        )

I want to get counts for different dates in the RDD in one single iteration. But when I run this I get message saying:

No implicits found for parameters evidence$6: Encoder[Unit]

So I added this line:

  implicit val myEncoder: Encoder[Unit] = org.apache.spark.sql.Encoders.kryo[Unit]

But then I get this error.

Exception in thread "main" java.lang.ExceptionInInitializerError
    at com.xyz.SparkBatchJob.main(SparkBatchJob.scala)
Caused by: java.lang.UnsupportedOperationException: Primitive types are not supported.
    at org.apache.spark.sql.Encoders$.genericSerializer(Encoders.scala:200)
    at org.apache.spark.sql.Encoders$.kryo(Encoders.scala:152)

How do I fix this? OR Is there a better way to get the counts I want in a single iteration (O(N) time)?

Upvotes: 0

Views: 526

Answers (1)

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

A Spark RDD is a representation of a distributed collection. When you apply a map function to an RDD, the function that you use to manipulate the collection is going to be executed across the cluster so there is no sense in mutating a variable created out of the scope of the map function.

In your code, the problem is because you don´t return any value, instead you are trying to mutate a structure and for that reason the compiler infers that the new created RDD after the transformation is a RDD[Unit].

If you need to create a Map as a result of a Spark action you must create a pairRDD and then apply the reduce operation.

Include the type of the rdd and the mapEvent function to see how it could be done.

Spark builds a DAG with the transformation and the action, it does not process the data twice.

Upvotes: 1

Related Questions