null
null

Reputation: 3517

Spark not serializable issue

I'm, working on refactoring our code so that we can use the CAKE pattern for DI.

I've stumbled upon a serialisation issue which I'm having difficulty in understanding.

When I call this function:

def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
  val accounts = getAccounts //call to db

  val result = accounts.map(row =>
    Some(AccountDetails(UUID.fromString(row.getAs[String]("")),
      row.getAs[String](""),
      UUID.fromString(row.getAs[String]("")),
      row.getAs[String](""),
      row.getAs[String](""),
      DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones))))
    .map(m=>m.get)
  result
}

it works perfectly, but this is ugly and I want to refactor it so that the middle mapping from row to AccountDetails is placed inside a private function - but when doing that it causes the serialisation issue.

I'd like:

def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
  val accounts = getAccounts 

  val result = accounts
    .map(m => getAccountDetails(m, winZones))
    .filter(_.isDefined)
    .map(m => m.get)
  result
}

private def getAccountDetails(row: Row, winZones: Broadcast[List[WindowsZones]]): Option[AccountDetails] = {
      try {
        Some(AccountDetails(UUID.fromString(""),
          row.getAs[String](""),
          UUID.fromString(row.getAs[String]("")),
          row.getAs[String](""),
          row.getAs[String](""),
          DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones)))
      }
      catch {
        case e: Exception =>
          logger.error(s"Unable to set AccountDetails $e")
          None
      }
    }

Any help is appreciated of course, the AccountDetails obj is a case class should that be pertinent. Also happy to take any other advice on implementing cake or DI with spark in general. Thanks.

Edit to show structure:

trait serviceImpl extends anotherComponent {this: DBA =>
  def Accounts = new Accounts
  class Accounts extends AccountService {
    //the methods above are defined here.
  }

Edit to include stacktrace:

    17/02/13 17:32:32 INFO CodeGenerator: Code generated in 271.36617 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
    at FunnelServiceComponentImpl$FunnelAccounts.getAccounts(FunnelServiceComponentImpl.scala:24)
    at Main$.delayedEndpoint$Main$1(Main.scala:26)
    at Main$delayedInit$body.apply(Main.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at Main$.main(Main.scala:7)
    at Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: FunnelServiceComponentImpl$FunnelAccounts
Serialization stack:
    - object not serializable (class: FunnelServiceComponentImpl$FunnelAccounts, value: FunnelServiceComponentImpl$FunnelAccounts@16b7e04a)
    - field (class: FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, name: $outer, type: class FunnelServiceComponentImpl$FunnelAccounts)
    - object (class FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 26 more
17/02/13 17:32:32 INFO SparkContext: Invoking stop() from shutdown hook

Upvotes: 0

Views: 2430

Answers (2)

Joe C
Joe C

Reputation: 15674

Because getAccountDetails is in your class, Spark will want to serialize your entire FunnelAccounts object. After all, you need an instance in order to use this method. However, FunnelAccounts is not serializable. Thus it can't be sent off to a worker.

In your case, you should move getAccountDetails into a FunnelAccounts object, so that you don't need an instance FunnelAccounts to run it.

Upvotes: 1

Assaf Mendelson
Assaf Mendelson

Reputation: 12991

Where are you defining the functions?

Let's say you are defining them in a class X. If the class is not serializable this would cause your issue.

To solve this you can either make it an object instead or make the class serializable.

Upvotes: 2

Related Questions