Reputation: 337
I made custom KeyedDeserializationSchema
to deserialize kafka messages and using it like this:
object Job {
case class KafkaMsg[K, V](
key: K, value: V, topic: String, partiton: Int, offset: Long)
trait Deser[A] {
def deser(a: Array[Byte]): A
}
object Deser {
def apply[A](implicit sh: Deser[A]): Deser[A] = sh
def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)
implicit val stringDeser: Deser[String] =
new Deser[String] {
def deser(a: Array[Byte]): String = ""
}
implicit val longDeser: Deser[Long] =
new Deser[Long] {
def deser(a: Array[Byte]): Long = 0
}
}
class TypedKeyedDeserializationSchema[
K: Deser: TypeInformation,
V: Deser: TypeInformation
] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(Deser[K].deser(key),
Deser[V].deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
createTypeInformation
}
def main(args: Array[String]) {
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new FlinkKafkaConsumer011(
"topic",
new TypedKeyedDeserializationSchema[String, Long],
properties
))
.print
env.execute("Flink Scala API Skeleton")
}
}
Which gives me:
[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error] at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error] at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error] at l7.Job$.main(Job.scala:89)
[error] at l7.Job.main(Job.scala)
The problem is obviously in my Deser
type class like implementation, but I don't understand what exactly cause this error or how to fix it.
Upvotes: 1
Views: 4166
Reputation: 23788
Yes, the reason for this error is that your Deser
unlike TypeInformation
does not extend/implement Serializable
. To find out why this happens you might start by asking yourself a question: why do I need to declare implicit val stringDeser
and implicit val longDeser
?
The answer is what Scala compiler does when it sees a generic constraint in a form of K: Deser: TypeInformation
. And what it does is that it rewrites it using implicit
evidence objects. So you code is transformed into something like this:
class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K],
val kTypeInfoEn: TypeInformation[K],
val vDeserEv: Deser[V],
val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(kDeserEv.deser(key),
vDeserEv.deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation
}
Now it is obvious that the object of type TypedKeyedDeserializationSchema[String,Long]
actually contains two fields of type Deser[String]
and Deser[Long]
with values from the implicit val
s you declared above. So when the Flink tries to ensure that the function you pass to it is Serializable
, the check fails.
Now the solution is obvious: make your trait Deser[A]
extend Serializable
trait Deser[A] extends Serializable {
def deser(a: Array[Byte]): A
}
Upvotes: 2