Harmeet Singh Taara
Harmeet Singh Taara

Reputation: 6611

Scala/Akka/Protobuf: Failed to serialize and deserialize message

we are trying to using protobuf with Akka and serialize all Akka messages via protobuf. For Scala, we have a library called ScalaPB which help us to generate the class, which includes methods like parseFrom, toByteArray etc for serializing or deserialize our data. But, while we try to run the program, getting below exception:

akka.actor.dungeon.SerializationCheckFailedException: Failed to serialize and deserialize message of type com.knoldus.akkaserialization.msg.example.Bang$ for testing. To avoid this error, either disable 'akka.actor.serialize-messages', mark the message with 'akka.actor.NoSerializationVerificationNeeded', or configure serialization to support this message

application.conf file contains below configuration:

akka {
    actor {
        allow-java-serialization = off
        serialize-messages = on
        serializers {
            proto = "akka.remote.serialization.ProtobufSerializer"
        }

        serialization-bindings {
            "com.knoldus.akkaserialization.msg.example.Bang" = proto
            "com.knoldus.akkaserialization.msg.example.Message" = proto
        }
    }
}

These classes com.knoldus.akkaserialization.msg.example.Bang and com.knoldus.akkaserialization.msg.example.Message generates via ScalaPB and contains all require methods.

Source code of akka.remote.serialization.ProtobufSerializer define,

This Serializer serializes `akka.protobuf.Message` and `com.google.protobuf.Message` It is using reflection to find the `parseFrom` and `toByteArray` methods to avoid dependency to `com.google.protobuf`

So, we expecting, this automatically reads our case classes Bang and Message and perform serialization, but unfortunately getting serialization exception.

Could you help to figure out what exact problem with ScalaPB and ProtoBuff?

Upvotes: 0

Views: 2216

Answers (3)

Kris Rice
Kris Rice

Reputation: 1007

With ScalaPB, you need to provide a Marshaller for Http requests/responses to be serialised.

This will allow Akka to automatically serialise/deserialise your requests directly into Proto message types.

For example, a Proto marshaller would look like:

trait ProtobufMarshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
  implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
}

Then in your Routes object, add with ProtobufMarshalling. This will import the marshallers automatically into the object.

Assuming proto messages called "ProtoRequest" and "ProtoResponse", an example Route object would look like:

object MyRoutes with ProtobufMarshalling[ProtoRequest, ProtoResponse] {
    lazy val apiRoutes =
        pathPrefix("api") {
            path("test") {
                pathEndOrSingleSlash {
                    post {
                        entity(as[ProtoRequest]) { protoRequest =>
                            complete(ProtoResponse())
                        }
                    }
                }
            }
        }
}

Upvotes: 0

counter2015
counter2015

Reputation: 1107

Here is a easy way to do it, just add following lines in your configuration.

https://doc.akka.io/docs/akka/current/serialization.html

Akka provides serializers for several primitive types and protobuf com.google.protobuf.GeneratedMessage (protobuf2) and com.google.protobuf.GeneratedMessageV3 (protobuf3) by default (the latter only if depending on the akka-remote module), so normally you don’t need to add configuration for that if you send raw protobuf messages as actor messages.

ScalaPB generated code can also be serialize to protobuf, so we just need to bind ScalaPB generated case class trait to serializer.

akka {
  actor {
    serialization-bindings {
      "com.google.protobuf.Message" = proto
      "scalapb.GeneratedMessage" = proto
      "scalapb.GeneratedEnum" = proto
    }
  }
}

This works for me. My environment is:

  • akka-grpc: 2.1.4
  • akka: 2.6.19
  • Scala: 2.13.8

Upvotes: 0

thesamet
thesamet

Reputation: 6582

The serializer you are using was designed to work with Java protobufs, not with ScalaPB. You need to include your own serializer. Here is a one you can use:

package com.example.protoser

import java.util.concurrent.atomic.AtomicReference

import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import scalapb.GeneratedMessageCompanion

class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
  private val classToCompanionMapRef = new AtomicReference[Map[Class[_], GeneratedMessageCompanion[_]]](Map.empty)

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case e: scalapb.GeneratedMessage => e.toByteArray
    case _ => throw new IllegalArgumentException("Need a subclass of scalapb.GeneratedMessage")
  }

  override def includeManifest: Boolean = true

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    manifest match {
      case Some(clazz) =>
        @scala.annotation.tailrec
        def messageCompanion(companion: GeneratedMessageCompanion[_] = null): GeneratedMessageCompanion[_] = {
          val classToCompanion = classToCompanionMapRef.get()
          classToCompanion.get(clazz) match {
            case Some(cachedCompanion) => cachedCompanion
            case None =>
              val uncachedCompanion =
                if (companion eq null) Class.forName(clazz.getName + "$", true, clazz.getClassLoader)
                  .getField("MODULE$").get().asInstanceOf[GeneratedMessageCompanion[_]]
                else companion
              if (classToCompanionMapRef.compareAndSet(classToCompanion, classToCompanion.updated(clazz, uncachedCompanion)))
                uncachedCompanion
              else
                messageCompanion(uncachedCompanion)
          }
        }
        messageCompanion().parseFrom(bytes).asInstanceOf[AnyRef]
      case _ => throw new IllegalArgumentException("Need a ScalaPB companion class to be able to deserialize.")
    }
  }
}

The configuration should be something like this:

akka {
  actor {
    serializers {
      scalapb = "com.example.protoser.ScalaPbSerializer"
    }
 
    serialization-bindings {
      "scalapb.GeneratedMessage" = scalapb
    }

    serialization-identifiers {
      "com.example.protoser.ScalaPbSerializer" = 10000
    }
  }
}

The above was adapted from old code, so edits and suggestions are welcome!

Upvotes: 4

Related Questions