Reputation: 4842
I have a persistent actor which can receive one type of command Persist(event)
where event is a type of trait Event
(there are numerous implementations of it). And on success, this reponds with Persisted(event)
to the sender.
The event itself is serializable since this is the data we store in persistence storage and the serialization is implemented with a custom serializer which internally uses classes generated from google protobuf .proto
files. And this custom serializer is configured in application.conf
and bound to the base trait Event
. This is already working fine.
Note: The implementations of Event
are not classes generated by protobuf. They are normal scala classes and they have a protobuf equivalent of them too, but that's mapped through the custom serializer that's bound to the base Event type. This was done by my predecessors for versioning (which probably isn't required because this can be handled with plain protobuf classes + custom serialization combi too, but that's a different matter) and I don't wish to change that atm.
We're now trying to implement cluster sharding for this actor which also means that my commands (viz. Persist
and Persisted
) also need to be serializable since they may be forwarded to other nodes.
This is the domain model :
sealed trait PersistenceCommand {
def event: Event
}
final case class Persisted(event: Event) extends PersistenceCommand
final case class Persist(event: Event) extends PersistenceCommand
Problem is, I do not see an elegent way to make it serializable. Following are the options I have considered
Approach 1. Define a new proto file for Persist
and Persisted
, but what do I use as the datatype for event
? I didn't find a way to define something like this :
message Persist {
"com.example.Event" event = 1 // this doesn't work
}
Such that I can use existing Scala trait Event
as a data type. If this works, I guess (it's far fetched though) I could bind the generated code (After compiling this proto file) to akka's inbuilt serializer for google protobuf and it may work. The note above explains why I cannot use oneof
construct in my proto file.
Approach 2. This is what I've implemented and it works (but I don't like it)
Basically, I wrote a new serializer for the commands and delegated seraizalition and de-serialization of event
part of the command to the existing serializer.
class PersistenceCommandSerializer extends SerializerWithStringManifest {
val eventSerializer: ManifestAwareEventSerializer = new ManifestAwareEventSerializer()
val PersistManifest = Persist.getClass.getName
val PersistedManifest = Persisted.getClass.getName
val Separator = "~"
override def identifier: Int = 808653986
override def manifest(o: AnyRef): String = o match {
case Persist(event) => s"$PersistManifest$Separator${eventSerializer.manifest(event)}"
case Persisted(event) => s"$PersistedManifest$Separator${eventSerializer.manifest(event)}"
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case command: PersistenceCommand => eventSerializer.toBinary(command.event)
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val (commandManifest, dataManifest) = splitIntoCommandAndDataManifests(manifest)
val event = eventSerializer.fromBinary(bytes, dataManifest).asInstanceOf[Event]
commandManifest match {
case PersistManifest =>
Persist(event)
case PersistedManifest =>
Persisted(event)
}
}
private def splitIntoCommandAndDataManifests(manifest: String) = {
val commandAndDataManifests = manifest.split(Separator)
(commandAndDataManifests(0), commandAndDataManifests(1))
}
}
Problem with this approach is the thing I'm doing in def manifest
and in def fromBinary
. I had to make sure that I have the command's manifest as well as the event's manifest while serializing and de-serializing. Hence, I had to use ~
as a separator - sort of, my custom serialization technique for the manifest information.
Is there a better or perhaps, a right way, to implement this?
For context: I'm using ScalaPB for generating scala classes from .proto
files and classic akka actors.
Any kind of guidance is hugely appreciated!
Upvotes: 0
Views: 941
Reputation: 11479
If you delegate serialization of the nested object to whichever serializer you have configured the nested field should have bytes
for the serialized data but also an int32
with the id of the used serializer and bytes
for the message manifest. This ensures that you will be able to version/replace the nested serializers which is important for data that will be stored for a longer time period.
You can see how this is done internally in Akka for our own wire formats here: https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/protobuf/WireFormats.proto#L48 and here https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala#L45
Upvotes: 1