Reputation: 6611
we are trying to using protobuf with Akka and serialize all Akka messages via protobuf. For Scala, we have a library called ScalaPB which help us to generate the class, which includes methods like parseFrom
, toByteArray
etc for serializing or deserialize our data. But, while we try to run the program, getting below exception:
akka.actor.dungeon.SerializationCheckFailedException: Failed to serialize and deserialize message of type com.knoldus.akkaserialization.msg.example.Bang$ for testing. To avoid this error, either disable 'akka.actor.serialize-messages', mark the message with 'akka.actor.NoSerializationVerificationNeeded', or configure serialization to support this message
application.conf file contains below configuration:
akka {
actor {
allow-java-serialization = off
serialize-messages = on
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"com.knoldus.akkaserialization.msg.example.Bang" = proto
"com.knoldus.akkaserialization.msg.example.Message" = proto
}
}
}
These classes com.knoldus.akkaserialization.msg.example.Bang
and com.knoldus.akkaserialization.msg.example.Message
generates via ScalaPB and contains all require methods.
Source code of akka.remote.serialization.ProtobufSerializer
define,
This Serializer serializes `akka.protobuf.Message` and `com.google.protobuf.Message` It is using reflection to find the `parseFrom` and `toByteArray` methods to avoid dependency to `com.google.protobuf`
So, we expecting, this automatically reads our case classes Bang
and Message
and perform serialization, but unfortunately getting serialization exception.
Could you help to figure out what exact problem with ScalaPB and ProtoBuff?
Upvotes: 0
Views: 2216
Reputation: 1007
With ScalaPB, you need to provide a Marshaller
for Http requests/responses to be serialised.
This will allow Akka to automatically serialise/deserialise your requests directly into Proto message types.
For example, a Proto marshaller would look like:
trait ProtobufMarshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)
implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
}
}
Then in your Routes
object, add with ProtobufMarshalling
. This will import the marshallers automatically into the object.
Assuming proto messages called "ProtoRequest" and "ProtoResponse", an example Route
object would look like:
object MyRoutes with ProtobufMarshalling[ProtoRequest, ProtoResponse] {
lazy val apiRoutes =
pathPrefix("api") {
path("test") {
pathEndOrSingleSlash {
post {
entity(as[ProtoRequest]) { protoRequest =>
complete(ProtoResponse())
}
}
}
}
}
}
Upvotes: 0
Reputation: 1107
Here is a easy way to do it, just add following lines in your configuration.
https://doc.akka.io/docs/akka/current/serialization.html
Akka provides serializers for several primitive types and protobuf com.google.protobuf.GeneratedMessage (protobuf2) and com.google.protobuf.GeneratedMessageV3 (protobuf3) by default (the latter only if depending on the akka-remote module), so normally you don’t need to add configuration for that if you send raw protobuf messages as actor messages.
ScalaPB generated code can also be serialize to protobuf, so we just need to bind ScalaPB generated case class trait to serializer.
akka {
actor {
serialization-bindings {
"com.google.protobuf.Message" = proto
"scalapb.GeneratedMessage" = proto
"scalapb.GeneratedEnum" = proto
}
}
}
This works for me. My environment is:
Upvotes: 0
Reputation: 6582
The serializer you are using was designed to work with Java protobufs, not with ScalaPB. You need to include your own serializer. Here is a one you can use:
package com.example.protoser
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import scalapb.GeneratedMessageCompanion
class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private val classToCompanionMapRef = new AtomicReference[Map[Class[_], GeneratedMessageCompanion[_]]](Map.empty)
override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: scalapb.GeneratedMessage => e.toByteArray
case _ => throw new IllegalArgumentException("Need a subclass of scalapb.GeneratedMessage")
}
override def includeManifest: Boolean = true
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
manifest match {
case Some(clazz) =>
@scala.annotation.tailrec
def messageCompanion(companion: GeneratedMessageCompanion[_] = null): GeneratedMessageCompanion[_] = {
val classToCompanion = classToCompanionMapRef.get()
classToCompanion.get(clazz) match {
case Some(cachedCompanion) => cachedCompanion
case None =>
val uncachedCompanion =
if (companion eq null) Class.forName(clazz.getName + "$", true, clazz.getClassLoader)
.getField("MODULE$").get().asInstanceOf[GeneratedMessageCompanion[_]]
else companion
if (classToCompanionMapRef.compareAndSet(classToCompanion, classToCompanion.updated(clazz, uncachedCompanion)))
uncachedCompanion
else
messageCompanion(uncachedCompanion)
}
}
messageCompanion().parseFrom(bytes).asInstanceOf[AnyRef]
case _ => throw new IllegalArgumentException("Need a ScalaPB companion class to be able to deserialize.")
}
}
}
The configuration should be something like this:
akka {
actor {
serializers {
scalapb = "com.example.protoser.ScalaPbSerializer"
}
serialization-bindings {
"scalapb.GeneratedMessage" = scalapb
}
serialization-identifiers {
"com.example.protoser.ScalaPbSerializer" = 10000
}
}
}
The above was adapted from old code, so edits and suggestions are welcome!
Upvotes: 4