user_s
user_s

Reputation: 1078

Akka ClusterSingletonProxy to a remote deployed singleton

I'm trying to send a message to a singleton actor that was deployed on a remote node through another actor.

This is the manager that is waiting for a memberUp event, then deploys Worker actor on that node and then sends the singleton a message:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "worker",
        terminationMessage = End,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))))

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/singleton/worker",
        role = Some(s"worker")), "worker") ! "hello"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

This is the worker:

object Worker extends App{
  ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

And the application.conf:

manager {
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://[email protected]:2552"
      ]
      roles.1 = "manager"

    }
    remote.netty.tcp.port = 2552

  }
}

worker {
  akka {
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://[email protected]:2552"
      ]
      roles.1 = "worker"
    }
    remote.netty.tcp.port = 2554
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
  }
}

The worker is initialized (and I can see in the logs the state change [Start -> Oldest] message) but the message sent from the manager never arrives to the worker. It used to work fine when I was deploying the singleton on the remote node, but now I want the manager the deploy it.

I also tried to deploy it as the child of the manager (using context instead of context.system) and changed the singleton path to user/manager/singleton/worker, but it didn't work.

I'm using Akka 2.3.11

Edit: sbt file:

name := "MyProject"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies +=
    "com.typesafe.akka" %% "akka-actor" % "2.3.11",
    "com.typesafe.akka" %% "akka-cluster" % "2.3.11",
    "joda-time" % "joda-time" % "2.0",
    "com.typesafe.akka" %% "akka-contrib" % "2.3.11"

Upvotes: 0

Views: 536

Answers (1)

thwiegan
thwiegan

Reputation: 2173

So I played around a bit with different options of creating ClusterSingletonManagers and I think deploying them remotely is breaking something within the singleton pattern. I have gathered a few indicators for this:

  • Since it is a remote deployment the path of the ClusterSingletonManager on the worker node is /remote/akka.tcp/[email protected]:2552/user/worker. I don't think the library can / will handle this, since it expects /user/worker

  • When trying to send the message from the master node using ClusterSingletonProxy log in DEBUG mode states No singleton available, stashing message hello worker and Trying to identify singleton at akka.tcp://[email protected]:2552/user/worker/singleton (which fails and retries) -> It is looking for the singleton on the wrong node, since no manager is available and it is apparently not aware that the singleton is on the worker node.

When creating the ClusterSingletonManager on the worker node directly everything works as expected.

You also had an issue with your naming of the manager. Your singletonName is worker and your manager itself (the actor) does not have any name. When you create the proxy you use the path /user/singleton/worker, but the path should be as follows: /user/{actorName}/{singletonName}. So in my code I used worker as the actorName and singleton as the singletonName.

So here's my working code:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/worker/singleton",
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))

  sys.actorOf(ClusterSingletonManager.props(
    singletonProps = Props(classOf[Worker]),
    singletonName = "singleton",
    terminationMessage = PoisonPill,
    role = Some("worker")), name = "worker")
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

application.conf and build.sbt stayed the same.

EDIT

Got it to work with by referencing the ClusterSingletonProxy with the actual path on the worker node (calculating in that it is a network path). I am not sure if I would recommend this, since I am still not sure, if that library is designed to be able to do this, but it works at least in this minimal example:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      val ref = context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "singleton",
        terminationMessage = PoisonPill,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))), name = "worker")

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"${ref.path.toStringWithoutAddress}/singleton", // /remote/akka.tcp/[email protected]:2552/user/worker/singleton
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

Upvotes: 2

Related Questions