Matthew Formosa
Matthew Formosa

Reputation: 468

Spark Kryo Serialization

We have a Spark Structured Streaming application that consumes from a Kafka topic in Avro format. The payload is part of the state object in the mapGroupWithState function. Given that we enforce FULL compatibility for our Avro schemas, we generally do not face problems when evolving our schemas. However, we have now evolved our schema by adding a nested object and Kryo serialization is failing with the following where xyz is the field which is a nested object within ObjectV1:

Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 020-12-14T19:23:49
Serialization trace:
xyz (x.y.z.ObjectV1)
Logical Plan:
SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#38]
+- MapPartitions <function1>, obj#37: scala.Tuple2
   +- DeserializeToObject decodeusingserializer(cast(value#34 as binary), scala.Option, true), obj#36: scala.Option
      +- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#34]
         +- FlatMapGroupsWithState <function3>, newInstance(class scala.Tuple4), decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [_1#29, _2#30, _3#31, _4#32L], [value#23], obj#33: scala.Option, class[value[0]: binary], Update, true, ProcessingTimeTimeout
            +- AppendColumns <function1>, class scala.Tuple2, [StructField(value,BinaryType,true)], decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._1 AS _1#29, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._2, true, false) AS _2#30, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._3 AS _3#31, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._4 AS _4#32L]
               +- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#23]
                  +- MapElements <function1>, interface org.apache.spark.sql.Row, [StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)], obj#22: scala.Tuple2
                     +- DeserializeToObject createexternalrow(key#7, value#8, topic#9.toString, partition#10, offset#11L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#12, true, false), timestampType#13, StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)), obj#21: org.apache.spark.sql.Row
                        +- StreamingExecutionRelation KafkaV2[Subscribe[ext_object_v1]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

The Spark version is 2.4.5. Has anyone come across something similar? Deleting the checkpoint folder resolves this but naturally, we would like to avoid this.

Upvotes: 3

Views: 404

Answers (0)

Related Questions