Reputation: 1951
The purpose of the following examples is to understand the difference of the two encoders in Spark Dataset.
I can do this:
val df = Seq((1, "a"), (2, "d")).toDF("id", "value")
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
val myStructType = StructType(Seq(StructField("id", IntegerType), StructField("value", StringType)))
implicit val myRowEncoder = RowEncoder(myStructType)
val ds = df.map{case row => row}
ds.show
//+---+-----+
//| id|value|
//+---+-----+
//| 1| a|
//| 2| d|
//+---+-----+
I can also do this:
val df = Seq((1, "a"), (2, "d")).toDF("id", "value")
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row]
val ds = df.map{case row => row}
ds.show
//+--------------------+
//| value|
//+--------------------+
//|[01 00 6F 72 67 2...|
//|[01 00 6F 72 67 2...|
//+--------------------+
The only difference of the code is: one is using Kryo encoder, another is using RowEncoder.
Question:
Upvotes: 4
Views: 5318
Reputation: 2702
TL/DR: dont' trust the show() method output. Your internal structure of your case class is not lost.
I was confused about how the show() method rendered the content of the tuples as one column named 'value' with binary content. I concluded (incorrectly) that the dataframe was just a one column binary blob that no longer adhered to the structure of a Tuple2[Integer,String] with columns named id and value. However, when I printed the actual content of the collected data frame I saw the correct values, column names and types. So I think this is just an issue with the show() method.
The program below should servce to reproduce my results:
object X extends App {
val sparkSession = SparkSession.builder().appName("tests")
.master("local")
.config("", "")
.getOrCreate()
import sparkSession.implicits._
val df = Seq((1, "a"), (2, "d")).toDF("id", "value")
import org.apache.spark.sql.{Encoder, Encoders, Row}
implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row]
val ds = df.map{case row => row}
ds.show // This shows only one column 'value' w/ binary content
// This shows that the schema and values are actually correct. The below will print:
// row shema:StructType(StructField(id,IntegerType,false),StructField(value,StringType,true))
// row:[1,a]
// row shema:StructType(StructField(id,IntegerType,false),StructField(value,StringType,true))
// row:[2,d]
val collected: util.List[Row] = ds.collectAsList()
collected.forEach{ row =>
System.out.println("row shema:" + row.schema)
System.out.println("row:" + row)
}
}
Upvotes: 0
Reputation: 18098
According to Spark's documentation, SparkSQL does NOT use Kryo or Java serializations (standardly).
Kryo is for RDDs and not Dataframes or DataSets. Hence the question is a little off-beam afaik.
Does Kryo help in SparkSQL? This elaborates on custom objects, but...
UPDATED Answer after some free time
Your example was not really what I would call custom type. They are are just structs with primitives. No issue.
Kryo is a serializer, DS, DF's use Encoders for columnar advantage. Kryo is used internally by Spark for shuffling.
This user defined example
case class Foo(name: String, position: Point)
is one that we can do with DS or DF or via kryo. But what's the point with Tungsten and Catalyst working with "understanding the structure of the data"? and thus able to optimize. You also get a single binary value with kryo and I have found few examples of how to work successfully with it, e.g. JOIN.
KRYO Example
import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)
implicit val PointEncoder: Encoder[Point] = Encoders.kryo[Point]
implicit val FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()
returns:
+--------------------+
| value|
+--------------------+
|[01 00 D2 02 6C 6...|
+--------------------+
Encoder for DS using case class Example
import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)
val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()
returns:
+----+--------+
|name|position|
+----+--------+
| bar| [0, 0]|
+----+--------+
This strikes me as the way to go with Spark, Tungsten, Catalyst.
Now, more complicated stuff is this when an Any is involved, but Any is not a good thing:
val data = Seq(
("sublime", Map(
"good_song" -> "santeria",
"bad_song" -> "doesn't exist")
),
("prince_royce", Map(
"good_song" -> 4,
"bad_song" -> "back it up")
)
)
val schema = List(
("name", StringType, true),
("songs", MapType(StringType, StringType, true), true)
)
val rdd= spark.sparkContext.parallelize(data)
rdd.collect
val df = spark.createDataFrame(rdd)
df.show()
df.printSchema()
returns:
Java.lang.UnsupportedOperationException: No Encoder found for Any.
Then this example is interesting that is a valid custom object use case Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable]. But I would stay away from such.
Conclusions
Kryo vs Encoder vs Java Serialization in Spark? states that kryo is for RDD but that is for legacy; internally one can use it. Not 100% correct but actually to the point.
Spark: Dataset Serialization is also an informative link.
The stuff has evolved and the spirit is to not use kryo for DS, DF.
Hope this helps.
Upvotes: 2
Reputation: 11807
Encoders.kryo simply creates an encoder that serializes objects of type T using Kryo
RowEncoder is an object in Scala with apply and other factory methods. RowEncoder can create ExpressionEncoder[Row] from a schema. Internally, apply creates a BoundReference for the Row type and returns a ExpressionEncoder[Row] for the input schema, a CreateNamedStruct serializer (using serializerFor internal method), a deserializer for the schema, and the Row type
RowEncoder knows about schema and uses it for serialization and deserialization.
Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
Kryo is good for efficiently storaging large dataset and network intensive application.
for more information you can refer to these links:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoders.html https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab https://stackoverflow.com/questions/58946987/what-are-the-pros-and-cons-of-java-serialization-vs-kryo-serialization#:~:text=Kryo%20is%20significantly%20faster%20and,in%20advance%20for%20best%20performance.
Upvotes: 4