franDayz
franDayz

Reputation: 943

cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian

I'm working on a service which uses Akka Persistence for event sourcing. So far we've been successfully storing the events in a Cassandra journal. Now we want to implement CQRS by leveraging Akka Persistence Query. As a first approach we are trying to follow the cluster singleton pattern to have an actor streaming the stored events by tag. For now we have this rather simple actor to be wrapped as a singleton:

public class EventProcessor extends AbstractLoggingActor {
  private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);

  private final CassandraReadJournal journal;

  public EventProcessor(ActorSystem system) {
    journal =
        PersistenceQuery.get(system)
            .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());

    journal
        .eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
        .map(EventEnvelope::persistenceId)
        .to(Sink.foreach(this::logMessage))
        .run(system);
  }

  private void logMessage(String id) {
    LOG.info(String.format("########## Received persistenceId %s", id));
  }

  @Override
  public Receive createReceive() {
    return null;
  }
}

And this is how we wrap the actor inside the guardian:

    akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();

    ClusterSingletonManagerSettings settings =
        ClusterSingletonManagerSettings.create(classicSystem);

    Props clusterSingletonManagerProps =
        ClusterSingletonManager.props(
            Props.create(EventProcessor.class, classicSystem),
            PoisonPill.getInstance(),
            settings);

    classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");

When we run the service, we get the following exception (on the actorOf line):

java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded

I'd like to mention that we are able to spawn other actor from within the same guardian like:

ActorRef<Command> actorRef =
        context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);

I'm completely new to Akka so any help would be much appreciated!

Upvotes: 1

Views: 1107

Answers (1)

franDayz
franDayz

Reputation: 943

Finally figured it out. Apparently when running in a typed system, I cannot use the old untyped approach with ClusterSingletonManager.props. I found here the proper way on a typed system: https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html

So my actor is now an AbstractBehavior:

public class EventProcessor extends AbstractBehavior<Void>

And this is the way I wrap it as a singleton:

    ClusterSingleton singleton = ClusterSingleton.get(context.getSystem());

    singleton.init(
        SingletonActor.of(EventProcessor.create(identityRequestAdapter), "eventProcessor"));

Upvotes: 1

Related Questions