cvb
cvb

Reputation: 337

Flink throws java.io.NotSerializableException

I made custom KeyedDeserializationSchema to deserialize kafka messages and using it like this:

object Job {
  case class KafkaMsg[K, V](
    key: K, value: V, topic: String, partiton: Int, offset: Long)

  trait Deser[A] {
    def deser(a: Array[Byte]): A
  }

  object Deser {

    def apply[A](implicit sh: Deser[A]): Deser[A] = sh
    def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)

    implicit val stringDeser: Deser[String] =
      new Deser[String] {
        def deser(a: Array[Byte]): String = ""
      }

    implicit val longDeser: Deser[Long] =
      new Deser[Long] {
        def deser(a: Array[Byte]): Long = 0
      }
  }

  class TypedKeyedDeserializationSchema[
    K: Deser: TypeInformation,
    V: Deser: TypeInformation
  ] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {

    def deserialize(key:   Array[Byte],
                    value: Array[Byte],
                    topic: String,
                    partition: Int,
                    offset:    Long
    ): KafkaMsg[K, V] =
      KafkaMsg(Deser[K].deser(key),
               Deser[V].deser(value),
               topic,
               partition,
               offset
      )

    def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false

    def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
      createTypeInformation
  }

  def main(args: Array[String]) {
    val properties = new Properties
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink-test")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env
        .addSource(new FlinkKafkaConsumer011(
                     "topic",
                     new TypedKeyedDeserializationSchema[String, Long],
                     properties
                   ))
        .print

    env.execute("Flink Scala API Skeleton")
  }
}

Which gives me:

[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error]         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error]         at l7.Job$.main(Job.scala:89)
[error]         at l7.Job.main(Job.scala)

The problem is obviously in my Deser type class like implementation, but I don't understand what exactly cause this error or how to fix it.

Upvotes: 1

Views: 4166

Answers (1)

SergGr
SergGr

Reputation: 23788

Yes, the reason for this error is that your Deser unlike TypeInformation does not extend/implement Serializable. To find out why this happens you might start by asking yourself a question: why do I need to declare implicit val stringDeser and implicit val longDeser?

The answer is what Scala compiler does when it sees a generic constraint in a form of K: Deser: TypeInformation. And what it does is that it rewrites it using implicit evidence objects. So you code is transformed into something like this:

class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K],
                                            val kTypeInfoEn: TypeInformation[K],
                                            val vDeserEv: Deser[V],
                                            val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] {

  def deserialize(key: Array[Byte],
                  value: Array[Byte],
                  topic: String,
                  partition: Int,
                  offset: Long
                 ): KafkaMsg[K, V] =
    KafkaMsg(kDeserEv.deser(key),
      vDeserEv.deser(value),
      topic,
      partition,
      offset
    )

  def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false

  def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation
}

Now it is obvious that the object of type TypedKeyedDeserializationSchema[String,Long] actually contains two fields of type Deser[String] and Deser[Long] with values from the implicit vals you declared above. So when the Flink tries to ensure that the function you pass to it is Serializable, the check fails.

Now the solution is obvious: make your trait Deser[A] extend Serializable

trait Deser[A] extends Serializable {
  def deser(a: Array[Byte]): A
}

Upvotes: 2

Related Questions