abhaydubey93
abhaydubey93

Reputation: 21

DataFrame using UDF giving Task not serializable Exception

Trying to use the show() method on a dataframe. It is giving Task not serializable Exception.

I have tried to extend the Serializable object but still the error persists.

object App extends Serializable{
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache").setLevel(Level.WARN);

    val spark = SparkSession.builder()
      .appName("LearningSpark")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    val inputPath = "./src/resources/2015-03-01-0.json"
    val ghLog = spark.read.json(inputPath)
    val pushes = ghLog.filter("type = 'PushEvent'")
    val grouped = pushes.groupBy("actor.login").count
    val ordered = grouped.orderBy(grouped("count").desc)
    ordered.show(5)
    val empPath = "./src/resources/ghEmployees.txt"
    val employees = Set() ++ (
      for {
        line <- fromFile(empPath).getLines
      } yield line.trim)
    val bcEmployees = sc.broadcast(employees)
    import spark.implicits._
    val isEmp = user => bcEmployees.value.contains(user)
    val isEmployee = spark.udf.register("SetContainsUdf", isEmp)
    val filtered = ordered.filter(isEmployee($"login"))
    filtered.show()
  }
}

Using Spark's default log4j profile:

org/apache/spark/log4j-defaults.properties
19/09/01 10:21:48 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
    at App$.main(App.scala:33)
    at App.main(App.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005@1fd37440)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:SetContainsUdf(actor#6.login))
    - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
    - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@3b65084e)
    - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
    - object (class scala.collection.immutable.$colon$colon, List(isnotnull(type#13), (type#13 = PushEvent), UDF:SetContainsUdf(actor#6.login)))
    - field (class: org.apache.spark.sql.execution.FileSourceScanExec, name: dataFilters, type: interface scala.collection.Seq)
    - object (class org.apache.spark.sql.execution.FileSourceScanExec, FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.FilterExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.FilterExec, Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
+- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.ProjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.ProjectExec, Project [actor#6]
+- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
   +- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.aggregate.HashAggregateExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.aggregate.HashAggregateExec, HashAggregate(keys=[actor#6.login AS actor#6.login#53], functions=[partial_count(1)], output=[actor#6.login#53, count#43L])
+- Project [actor#6]
   +- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
      +- FileScan json [actor#6,type#13] Batched:+------------------+-----+
|             login|count|
+------------------+-----+
|      greatfirebot|  192|
|diversify-exp-user|  146|
|     KenanSulayman|   72|
|        manuelrp07|   45|
|    mirror-updates|   42|
+------------------+-----+
only showing top 5 rows

 false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 14)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243@27438750)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 48 more

Upvotes: 2

Views: 3060

Answers (3)

A.N.T.
A.N.T.

Reputation: 66

I had spark 2.4.4 with Scala "2.12.1". I encountered the same issue (object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)) and it was driving me crazy. I changed Scala version to "2.12.10" and the issue is solved now!

Upvotes: 3

DaRkMaN
DaRkMaN

Reputation: 1054

The serialization issue is not because of object not being Serializable. The object is not serialized and sent to executors for execution, it is the transform code that is serialized.

One of the functions in the code is not Serializable. On looking at the code and the trace, isEmployee seems to be the issue. A couple of observations
1. isEmployee is not a UDF. In Spark, UDF needs to be created by extending org.apache.spark.sql.expressions.UserDefinedFunction which is Serializable, and after defining the function it needs to be registered using org.apache.spark.sql.UDFRegistration#register

I can think of two solutions: 1. Create and register UDF rightly, so that Serialization happens rightly
2. Completely avoid UDF and make use of broadcast variable and filter method as follows

val employees: Set[String] = Set("users")
val bcEmployees = sc.broadcast(employees)
val filtered = ordered.filter {
  x =>
    val user = x.getString(0) // assuming 0th index contains user
    bcEmployees.value.contains(user) // access broadcast variable in closure
}
filtered.show()

Upvotes: 1

Ged
Ged

Reputation: 18108

Life is full of mysteries. Serialization is one of them, and some aspects of the spark-shell vs. Databricks Notebooks - which are easier.

https://medium.com/onzo-tech/serialization-challenges-with-spark-and-scala-a2287cd51c54 should be consulted so as to see that extends Serializable as provided at top-level is not the clue; the Driver ships relevant pieces to Executors as far as I understand.

  • If I run your code as is in Databricks Notebook without any extends Serializable, it works fine! In the past I have been able to capture Serialization issues in the Databricks Notebooks - always to-date. Interesting, as in pseudo cluster one should pick up all the possible Serialization issues prior to release I was assured - apparently not so always. Interesting, but a notebook is not spark-submit.

  • If I run in spark-shell with two consecutive "paste modes" - logical, or line-by-line as follows, here under and 1) omit a few things and 2) adapt with extends Serializable for an Object for your UDF - which is for a Column, so we adhere to that, it works.

:paste 1

scala> :paste

// Entering paste mode (ctrl-D to finish)

object X extends Serializable {
    val isEmp = user => bcEmployees.value.contains(user)  
}  

:paste 2

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
  .appName("LearningSpark")
  .master("local[*]")
  .getOrCreate()
val sc = spark.sparkContext

// Register UDF
val isEmployee = udf(X.isEmp)

import scala.io.Source
import spark.implicits._

// Simulated input.
val ghLog = Seq(("john2X0", "push"), ("james09", "abc"), ("peter01", "push"), ("mary99", "push"), ("peter01", "push")).toDF("login", "type")
val pushes = ghLog.filter("type = 'push'")
val grouped = pushes.groupBy("login").count
val ordered = grouped.orderBy(grouped("count").desc)
ordered.show(5)

val emp = "/home/mapr/emp.txt"    
val employees = Set() ++ (
  for {
    line <- Source.fromFile(emp).getLines
  } yield line.trim)
val bcEmployees = sc.broadcast(employees)

val filtered = ordered.filter(isEmployee($"login"))
filtered.show()

So, the other answer states do not via UDF, more performant in some cases, but I am sticking with the UDF which allows column input and is potentially reusable. This approach works with spark-submit as well, although that should be obvious - mentioned for posterity.

Upvotes: 0

Related Questions