Reputation: 10324
I am running a program on Apache Flink. I got this error:
Caused by: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
How can I check the serialization method of an object in Scala/Java? How to check how it is serialized by Kryo?
EDIT: full exception follows:
Caused by: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
Caused by: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
Caused by: java.lang.IndexOutOfBoundsException: Index: 103, Size: 1
at java.util.ArrayList.rangeCheck(
at java.util.ArrayList.get(
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(
at com.esotericsoftware.kryo.Kryo.readClassAndObject(
at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(
at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:107)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
... 5 more
This is the type of the Flink DataSet we are using:
(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long, Long, Long, Long, Long)
sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case GNull() => 0
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
override def compareTo(o: GValue): Int = {
o match {
case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 }
case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 }
case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 }
case GNull() => 0
case _ => 0
* Represents a @GValue that contains an integer
* @deprecated
* @param v
case class GInt(v: Int) extends GValue{
def this() = this(0)
override def toString() : String = {
override def equals(other : Any) : Boolean = {
other match {
case GInt(value) => value.equals(v)
case _ => false
* Represents a @GValue that contains a number as a @Double
* @param v number
case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
def this() = this(0.0)
override def toString() : String = {
val dfs = new DecimalFormatSymbols(Locale.ENGLISH);
val df = new DecimalFormat("#.########", dfs);
override def equals(other : Any) : Boolean = {
other match {
case GDouble(value) => value.equals(v)
case _ => false
* Represents a @GValue that contains a @String
* @param v string
case class GString(v: String) extends GValue{
def this() = this(".")
override def toString() : String = {
override def equals(other : Any) : Boolean = {
other match {
case GString(value) => value.equals(v)
case _ => false
case class GNull() extends GValue{
override def toString() : String = {
override def equals(other : Any) : Boolean = {
other match {
case GNull() => true
case _ => false
Upvotes: 0
Views: 965