Reputation: 440
I have case class like this:
case class Ais(NotImportant)
extends Serializable {
var flag = Ais.Flag.NotFlagged
var cluster = Ais.Unknown
var visited = false
override def toString(): String = {
s"$cluster,$flag,$visited"
}
}
After running my algorithm I'm ending up with two Datasets of type (Int,Ais)
where variables in Ais
objects contain information. I need to union them. The most important for me are the values of var cluster
and var visited
. Yet after union
they are reset to default values.
labeledInner.foreach(println(_)) // This is fine
println("==========")
labeledOuter.foreach(println(_)) // This is also fine
println("==========")
labeledOuter.union(labeledInner).foreach(println(_)) // Here
// everything set to default
I'm running Spark 2.1 and Scala 2.11.8.
Upvotes: 2
Views: 279
Reputation: 37852
You should not use mutable vars
in case classes when using Spark - these do not "survive" Spark's encoding, so any non-trivial use of the Dataset (like using union
) which triggers encoding and decoding would not preserve these fields.
Why? Spark has built-in Encoders that are meant to efficiently encode objects into byte-arrays (and back). For case classes (actually, for all Product
s, which mostly means case classes and tuples), the encoders only encode the case-class fields that are defined as class parameters (in your case, only NotImportant
). You can see this by creating the relevant encoder for your case class and checking its schema:
case class A(s: String) {
var a: Int = 0
}
Encoders.product[A].schema.printTreeString()
// root
// |-- s: string (nullable = true)
As you can see - only s
survived, a
is not part of the schema.
What's the alternative? When using Spark (and really, Scala in general), you should refrain from mutable fields. Try modeling your data to contain all fields as immutable fields, e.g.:
case class Ais(flag: Flag, cluster: Cluster, visited: Boolean)
Then, to "mutate" these objects, you can use the copy
method that creates a new instance with some (or none) of the fields changed, e.g.:
val a = Ais(Ais.Flag.NotFlagged, Ais.Unknown, false)
val b = a.copy(visited = true)
These objects will be safe to use with Spark (they "survive" serialization and are immutable).
Upvotes: 4