Pawel Wisniewski
Pawel Wisniewski

Reputation: 440

Spark dataset union resets class variables

I have case class like this:

case class Ais(NotImportant)
  extends Serializable {


  var flag = Ais.Flag.NotFlagged
  var cluster = Ais.Unknown
  var visited = false

  override def toString(): String = {
    s"$cluster,$flag,$visited"
  }
}

After running my algorithm I'm ending up with two Datasets of type (Int,Ais) where variables in Ais objects contain information. I need to union them. The most important for me are the values of var cluster and var visited. Yet after union they are reset to default values.

labeledInner.foreach(println(_)) // This is fine
println("==========")
labeledOuter.foreach(println(_)) // This is also fine
println("==========")
labeledOuter.union(labeledInner).foreach(println(_)) // Here 
                                                // everything set to default

I'm running Spark 2.1 and Scala 2.11.8.

Upvotes: 2

Views: 279

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

You should not use mutable vars in case classes when using Spark - these do not "survive" Spark's encoding, so any non-trivial use of the Dataset (like using union) which triggers encoding and decoding would not preserve these fields.

Why? Spark has built-in Encoders that are meant to efficiently encode objects into byte-arrays (and back). For case classes (actually, for all Products, which mostly means case classes and tuples), the encoders only encode the case-class fields that are defined as class parameters (in your case, only NotImportant). You can see this by creating the relevant encoder for your case class and checking its schema:

case class A(s: String) {
  var a: Int = 0
}

Encoders.product[A].schema.printTreeString()
// root
// |-- s: string (nullable = true)

As you can see - only s survived, a is not part of the schema.

What's the alternative? When using Spark (and really, Scala in general), you should refrain from mutable fields. Try modeling your data to contain all fields as immutable fields, e.g.:

case class Ais(flag: Flag, cluster: Cluster, visited: Boolean)

Then, to "mutate" these objects, you can use the copy method that creates a new instance with some (or none) of the fields changed, e.g.:

val a = Ais(Ais.Flag.NotFlagged, Ais.Unknown, false)
val b = a.copy(visited = true)

These objects will be safe to use with Spark (they "survive" serialization and are immutable).

Upvotes: 4

Related Questions