Sparker0i
Sparker0i

Reputation: 1861

Dataset forEach loop in scala throwing SparkException Task not serializable

My question is really similar to this one, except that I'm using Scala.

For the below code:

        roleList = res.select($"results", explode($"results").as("results_flat1"))
                        .select("results_flat1.*")
                        .select(explode($"rows").as("rows"))
                        .select($"rows".getItem(0).as("x"))
                        .withColumn("y", trim(col("x")))
                        .select($"y".as("ROLE_NAME"))
                        .map(row => Role(row.getAs[String](0)))

        if (roleList.count() != 0) {
            println(s"Number of Roles = ${roleList.count()}")

            roleList.foreach{role =>
                var status = ""

                do {
                    val response = getDf()
                    response.show()

                    status = response.select("status").head().getString(0)
                    var error = ""

                    error = response.select($"results", explode($"results").as("results_flat1"))
                                .select("results_flat1.*")
                                .select($"error")
                                .first().get(0).asInstanceOf[String]
                }
                while (status != "completed")
            }
        }

I'm getting the following exception:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2715)
    at com.cloud.operations.RoleOperations.createRoles(RoleOperations.scala:30)
    at com.cloud.Main$.main(Main.scala:24)
    at com.cloud.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.cloud.operations.RoleOperations
Serialization stack:
    - object not serializable (class: com.cloud.operations.RoleOperations, value: com.cloud.operations.RoleOperations@67a3394c)
    - field (class: com.cloud.operations.RoleOperations$$anonfun$createRoles$1, name: $outer, type: class com.cloud.operations.RoleOperations)
    - object (class com.cloud.operations.RoleOperations$$anonfun$createRoles$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 21 more

RoleOperations.scala:30 refers to the line where roleList.foreach starts.

I'm not quite sure as to why this is happening. From the linked question's answer, I'm not using Spark Context anywhere in my code, though getDf() does use spark.read.json (from SparkSession). Even in that case, the exception does not occur at that line, but rather at the line above it, which is really confusing me. Please help on this.

Upvotes: 0

Views: 1679

Answers (1)

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2944

First of all you cant use spark session in functions that are executed on executors. SparkSession can be used only from driver code.

In your case everything in roleList.foreach is executed on executors, not on driver.

Additionally the same error might arise when someone uses a variable that is defined in class inside executors code. In this case whole class need to be sent to executor, and if it is not serializable - you get this error.

Upvotes: 2

Related Questions