Barry Zhong
Barry Zhong

Reputation: 570

akka projection + akka serialization cause JVM Crash with StackOverFlowError

I am new to Akka Projection + Akka Serialization, I applied official shopping cart 6th tutorial into my own project, and at first shot jvm crashed. I wish to know where should I look into.

Event Source Journal Plugin is on Mongodb Akka Projection is on Jdbc

val akkaVersion = "2.6.20"
val AkkaProjectionVersion = "1.2.5"
val ScalikeJdbcVersion = "3.5.0"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
  "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,

  // Akka Management powers Health Checks and Akka Cluster Bootstrapping
  "com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
  "com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
  "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion,
  "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % AkkaManagementVersion,
  "com.typesafe.akka" %% "akka-discovery" % akkaVersion,
  // Common Dependencies
  "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
  "ch.qos.logback" % "logback-classic" % "1.2.11",
  "org.scalatest" %% "scalatest" % "3.1.0" % Test,
  // Akka Persistence
  "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
  "com.github.scullxbones" %% "akka-persistence-mongo-scala" % "3.0.8",
  "com.github.scullxbones" %% "akka-persistence-mongo-rxmongo" % "3.0.8",
  "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,

  // 4. Querying or projecting data from Akka Persistence
  "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
  "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
  "com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion,
  "org.scalikejdbc" %% "scalikejdbc" % ScalikeJdbcVersion,
  "org.scalikejdbc" %% "scalikejdbc-config" % ScalikeJdbcVersion,
  "mysql" % "mysql-connector-java" % "5.1.34",
  "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test

The error logs shows only serialization issue:

received message: InitF(Kids, Member(5f6e16da,bz16716,None,None,Some(2f55a5),None,MemberStatus(1,Available),DateOnly(2022,12,22),None),Actor[akka://application/temp/FEn-4a4147e0$d#0])
2022-12-22 02:19:11 DEBUG iMadz  actor [akka://application/system/sharding/FEn/336/4a4147e0] received message: WriteMessagesSuccessful
2022-12-22 02:19:11 DEBUG iMadz  actor [akka://application/system/sharding/FamilyEntity/336/4a4147e0] received message: WriteMessageSuccess(PersistentRepr(FEn|4a4147e0,1,2ae2981eabd9,0,None),3)
2022-12-22 02:19:11 INFO  a.a.t.i.LogMessagesInterceptor  actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: RegistrationDone(UserProfile(572f55a5,barry0433),GHOST,fab07689,Actor[akka://application/temp/RegistrationService-97db1846570837fce6ff62a408f1c26a$a#0])
2022-12-22 02:19:11 INFO  a.a.t.i.LogMessagesInterceptor  actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: WriteMessagesSuccessful
2022-12-22 02:19:11 INFO  a.a.t.i.LogMessagesInterceptor  actor [akka://application/system/sharding/RegistrationService/795/97db1846570837fce6ff62a408f1c26a] received message: WriteMessageSuccess(PersistentRepr(RegistrationService|97db1846570837fce6ff62a408f1c26a,1,7d87389b-277b-41f2-a9cd-fee3303b1ae6,0,None),1)
Uncaught error from thread [application-akka.projection.jdbc.blocking-jdbc-dispatcher-77]: null, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[application]
java.lang.StackOverflowError
    at akka.serialization.Serialization.serializerFor(Serialization.scala:305)
    at akka.serialization.Serialization.findSerializerFor(Serialization.scala:292)
    at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:155)
    at akka.persistence.query.internal.QuerySerializer.manifest(QuerySerializer.scala:54)
    at akka.serialization.Serializers$.manifestFor(Serializer.scala:85)
    at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:157)
    at akka.persistence.query.internal.QuerySerializer.manifest(QuerySerializer.scala:54)
    at akka.serialization.Serializers$.manifestFor(Serializer.scala:85)
    at akka.persistence.query.internal.QuerySerializer.toStorageRepresentation(QuerySerializer.scala:157)

Event Here is quite simple:

  sealed trait FEvent extends CborSerializable
  final case class FCreated(id: Id, name: String, ownerId: Id, owner: Profile, createdOn: DateOnly) extends FEvent

Upvotes: 0

Views: 115

Answers (1)

Barry Zhong
Barry Zhong

Reputation: 570

The error occurred while serializing akka.contrib.persistence.mongodb.ObjectIdOffset, and QuerySerializer cannot find a proper serializer but infinite looping the manifest/serialize methods.

The fix can be easy as following:

    serialization-bindings {
      "docs.guide.CborSerializable" = jackson-cbor
      "akka.contrib.persistence.mongodb.ObjectIdOffset" = jackson-cbor
    }

but this serialization will base64.encode the ObjectIdOffset(hex:String, time: Long), and generate a longer current offset physical format. In fact ObjectIdOffset can be fully regenerated by the 12 bytes hex 'string', so a custom serializer can be provided as following:

package docs.guide

import akka.contrib.persistence.mongodb.ObjectIdOffset
import akka.serialization.Serializer
import reactivemongo.api.bson.BSONObjectID

import java.math.BigInteger

class ObjectIdOffsetSerializer extends Serializer {
  override def identifier: Int = 12345678

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case ObjectIdOffset(hexString, _) =>
      new BigInteger(hexString, 16).toByteArray
    case _ => throw new IllegalArgumentException("Only for ObjectIdOffset")
  }

  override def includeManifest: Boolean = true

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    BSONObjectID.parse((new BigInteger(bytes)).toString(16))
      .map(id => ObjectIdOffset(id.stringify, id.time))
      .getOrElse(throw new IllegalStateException("Only for ObjectIdOffset"))
  }
}

configuration file:

  actor {
       serializers {
        jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
        jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
        object-id-offset = "docs.guide.ObjectIdOffsetSerializer"
      }
    serialization-bindings {
      "docs.guide.CborSerializable" = jackson-cbor
      "akka.contrib.persistence.mongodb.ObjectIdOffset" = object-id-offset
    }

and eventually JdbcProjection will make current-offset a 16 chars string in database, which still could be optimized with less storage cost if neccessary

Upvotes: 1

Related Questions