Reputation: 1078
I'm trying to send a message to a singleton actor that was deployed on a remote node through another actor.
This is the manager that is waiting for a memberUp event, then deploys Worker
actor on that node and then sends the singleton a message:
object Manager extends App {
val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
sys.actorOf(Props[Manager], "manager")
}
class Manager extends Actor with ActorLogging {
override def receive: Receive = {
case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
context.system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Worker]),
singletonName = "worker",
terminationMessage = End,
role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))))
context.actorOf(ClusterSingletonProxy.props(
singletonPath = s"/user/singleton/worker",
role = Some(s"worker")), "worker") ! "hello"
}
override def preStart(): Unit = {
Cluster(context.system).subscribe(self,classOf[MemberUp])
}
}
This is the worker:
object Worker extends App{
ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}
class Worker extends Actor with ActorLogging {
override def receive: Receive = {
case msg =>
println(s"GOT MSG : $msg from : ${sender().path.name}")
}
}
And the application.conf:
manager {
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
cluster {
auto-down-unreachable-after = 20s
seed-nodes = [
"akka.tcp://[email protected]:2552"
]
roles.1 = "manager"
}
remote.netty.tcp.port = 2552
}
}
worker {
akka {
cluster {
auto-down-unreachable-after = 20s
seed-nodes = [
"akka.tcp://[email protected]:2552"
]
roles.1 = "worker"
}
remote.netty.tcp.port = 2554
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
}
}
The worker is initialized (and I can see in the logs the state change [Start -> Oldest]
message) but the message sent from the manager never arrives to the worker. It used to work fine when I was deploying the singleton on the remote node, but now I want the manager the deploy it.
I also tried to deploy it as the child of the manager (using context instead of context.system) and changed the singleton path to user/manager/singleton/worker
, but it didn't work.
I'm using Akka 2.3.11
Edit: sbt file:
name := "MyProject"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies +=
"com.typesafe.akka" %% "akka-actor" % "2.3.11",
"com.typesafe.akka" %% "akka-cluster" % "2.3.11",
"joda-time" % "joda-time" % "2.0",
"com.typesafe.akka" %% "akka-contrib" % "2.3.11"
Upvotes: 0
Views: 536
Reputation: 2173
So I played around a bit with different options of creating ClusterSingletonManager
s and I think deploying them remotely is breaking something within the singleton pattern. I have gathered a few indicators for this:
Since it is a remote deployment the path of the ClusterSingletonManager
on the worker node is /remote/akka.tcp/[email protected]:2552/user/worker
. I don't think the library can / will handle this, since it expects /user/worker
When trying to send the message from the master node using ClusterSingletonProxy
log in DEBUG mode states No singleton available, stashing message hello worker
and Trying to identify singleton at akka.tcp://[email protected]:2552/user/worker/singleton
(which fails and retries) -> It is looking for the singleton on the wrong node, since no manager is available and it is apparently not aware that the singleton is on the worker node.
When creating the ClusterSingletonManager
on the worker node directly everything works as expected.
You also had an issue with your naming of the manager. Your singletonName
is worker
and your manager itself (the actor) does not have any name. When you create the proxy you use the path /user/singleton/worker
, but the path should be as follows: /user/{actorName}/{singletonName}
. So in my code I used worker
as the actorName and singleton
as the singletonName
.
So here's my working code:
object Manager extends App {
val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
sys.actorOf(Props[Manager], "manager")
}
class Manager extends Actor with ActorLogging {
override def receive: Receive = {
case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
context.actorOf(ClusterSingletonProxy.props(
singletonPath = s"/user/worker/singleton",
role = Some("worker")), name = "workerProxy") ! "hello worker"
}
override def preStart(): Unit = {
Cluster(context.system).subscribe(self,classOf[MemberUp])
}
}
object Worker extends App{
val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
sys.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Worker]),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = Some("worker")), name = "worker")
}
class Worker extends Actor with ActorLogging {
override def receive: Receive = {
case msg =>
println(s"GOT MSG : $msg from : ${sender().path.name}")
}
}
application.conf and build.sbt stayed the same.
EDIT
Got it to work with by referencing the ClusterSingletonProxy
with the actual path on the worker node (calculating in that it is a network path). I am not sure if I would recommend this, since I am still not sure, if that library is designed to be able to do this, but it works at least in this minimal example:
object Manager extends App {
val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
sys.actorOf(Props[Manager], "manager")
}
class Manager extends Actor with ActorLogging {
override def receive: Receive = {
case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
val ref = context.system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Worker]),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))), name = "worker")
context.actorOf(ClusterSingletonProxy.props(
singletonPath = s"${ref.path.toStringWithoutAddress}/singleton", // /remote/akka.tcp/[email protected]:2552/user/worker/singleton
role = Some("worker")), name = "workerProxy") ! "hello worker"
}
override def preStart(): Unit = {
Cluster(context.system).subscribe(self,classOf[MemberUp])
}
}
object Worker extends App{
val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}
class Worker extends Actor with ActorLogging {
override def receive: Receive = {
case msg =>
println(s"GOT MSG : $msg from : ${sender().path.name}")
}
}
Upvotes: 2