Reputation: 2436
I have the following class in Scala
case class A
(a:Int,b:Int) extends Serializable
when I try in Spark 2.4. (via Databricks)
val textFile = sc.textFile(...)
val df = textFile.map(_=>new A(2,3)).toDF()
(Edit: the error happens when I call df.collect() or register as table)
I get org.apache.spark.SparkException: Task not serializable
what am I missing?
I've tried adding encoders:
implicit def AEncoder: org.apache.spark.sql.Encoder[A] =
org.apache.spark.sql.Encoders.kryo[A]
and
import spark.implicits._
import org.apache.spark.sql.Encoders
edit: I have also tried:
val df = textFile.map(_=>new A(2,3)).collect()
but no luck so far.
Upvotes: 0
Views: 135
Reputation: 18023
Sometimes this occurs intermittently on DataBricks. Most annoying.
Restart the cluster and try again, I have had this error sometimes and after restart it did not occur.
Upvotes: 1
Reputation: 3173
You can directly parse the file as Dataset
with the case class you have.
case class A(a:Int,b:Int) extends Serializable
val testRDD = spark.sparkContext.textFile("file:///test_file.csv")
val testDS = testRDD.map( line => line.split(",")).map(line_cols => A(line_cols(0).toInt, line_cols(1).toInt) ).toDS()
#res23: org.apache.spark.sql.Dataset[A] = [a: int, b: int]
Upvotes: 0