Reputation: 570
I am new to Akka Projection + Akka Serialization, I applied official shopping cart 6th tutorial into my own project, and at first shot jvm crashed. I wish to know where should I look into.
Event Source Journal Plugin is on Mongodb Akka Projection is on Jdbc
val akkaVersion = "2.6.20"
val AkkaProjectionVersion = "1.2.5"
val ScalikeJdbcVersion = "3.5.0"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
// Akka Management powers Health Checks and Akka Cluster Bootstrapping
"com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion,
"com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % AkkaManagementVersion,
"com.typesafe.akka" %% "akka-discovery" % akkaVersion,
// Common Dependencies
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.11",
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
// Akka Persistence
"com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
"com.github.scullxbones" %% "akka-persistence-mongo-scala" % "3.0.8",
"com.github.scullxbones" %% "akka-persistence-mongo-rxmongo" % "3.0.8",
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
// 4. Querying or projecting data from Akka Persistence
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
"com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion,
"org.scalikejdbc" %% "scalikejdbc" % ScalikeJdbcVersion,
"org.scalikejdbc" %% "scalikejdbc-config" % ScalikeJdbcVersion,
"mysql" % "mysql-connector-java" % "5.1.34",
"com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test
The error logs shows only serialization issue:
received message: InitF(Kids, Member(5f6e16da,bz16716,None,None,Some(2f55a5),None,MemberStatus(1,Available),DateOnly(2022,12,22),None),Actor[akka://application/temp/FEn-4a4147e0$d#0])
2022-12-22 02:19:11 DEBUG iMadz actor [akka://application/system/sharding/FEn/336/4a4147e0] received message: WriteMessagesSuccessful
2022-12-22 02:19:11 DEBUG iMadz actor [akka://application/system/sharding/FamilyEntity/336/4a4147e0] received message: WriteMessageSuccess(PersistentRepr(FEn|4a4147e0,1,2ae2981eabd9,0,None),3)
2022-12-22 02:19:11 INFO a.a.t.i.LogMessagesInterceptor actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: RegistrationDone(UserProfile(572f55a5,barry0433),GHOST,fab07689,Actor[akka://application/temp/RegistrationService-97db1846570837fce6ff62a408f1c26a$a#0])
2022-12-22 02:19:11 INFO a.a.t.i.LogMessagesInterceptor actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: WriteMessagesSuccessful
2022-12-22 02:19:11 INFO a.a.t.i.LogMessagesInterceptor actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: WriteMessageSuccess(PersistentRepr(RegistrationService|97db1846570837fce6ff62a408f1c26a,1,7d87389b-277b-41f2-a9cd-fee3303b1ae6,0,None),1)
Uncaught error from thread [application-akka.projection.jdbc.blocking-jdbc-dispatcher-77]: null, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[application]
java.lang.StackOverflowError
at akka.serialization.Serialization.serializerFor(Serialization.scala:305)
at akka.serialization.Serialization.findSerializerFor(Serialization.scala:292)
at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:155)
at akka.persistence.query.internal.QuerySerializer.manifest(QuerySerializer.scala:54)
at akka.serialization.Serializers$.manifestFor(Serializer.scala:85)
at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:157)
at akka.persistence.query.internal.QuerySerializer.manifest(QuerySerializer.scala:54)
at akka.serialization.Serializers$.manifestFor(Serializer.scala:85)
at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:157)
Event Here is quite simple:
sealed trait FEvent extends CborSerializable
final case class FCreated(id: Id, name: String, ownerId: Id, owner: Profile, createdOn: DateOnly) extends FEvent
Upvotes: 0
Views: 115
Reputation: 570
The error occurred while serializing akka.contrib.persistence.mongodb.ObjectIdOffset, and QuerySerializer cannot find a proper serializer but infinite looping the manifest/serialize methods.
The fix can be easy as following:
serialization-bindings {
"docs.guide.CborSerializable" = jackson-cbor
"akka.contrib.persistence.mongodb.ObjectIdOffset" = jackson-cbor
}
but this serialization will base64.encode the ObjectIdOffset(hex:String, time: Long), and generate a longer current offset physical format. In fact ObjectIdOffset can be fully regenerated by the 12 bytes hex 'string', so a custom serializer can be provided as following:
package docs.guide
import akka.contrib.persistence.mongodb.ObjectIdOffset
import akka.serialization.Serializer
import reactivemongo.api.bson.BSONObjectID
import java.math.BigInteger
class ObjectIdOffsetSerializer extends Serializer {
override def identifier: Int = 12345678
override def toBinary(o: AnyRef): Array[Byte] = o match {
case ObjectIdOffset(hexString, _) =>
new BigInteger(hexString, 16).toByteArray
case _ => throw new IllegalArgumentException("Only for ObjectIdOffset")
}
override def includeManifest: Boolean = true
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
BSONObjectID.parse((new BigInteger(bytes)).toString(16))
.map(id => ObjectIdOffset(id.stringify, id.time))
.getOrElse(throw new IllegalStateException("Only for ObjectIdOffset"))
}
}
configuration file:
actor {
serializers {
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
object-id-offset = "docs.guide.ObjectIdOffsetSerializer"
}
serialization-bindings {
"docs.guide.CborSerializable" = jackson-cbor
"akka.contrib.persistence.mongodb.ObjectIdOffset" = object-id-offset
}
and eventually JdbcProjection will make current-offset a 16 chars string in database, which still could be optimized with less storage cost if neccessary
Upvotes: 1