Reputation: 95278
I'm trying to set up a simple TCP server using akka actors that should allow multiple clients to be connected simultaneously. I reduced my problem to the following simple program:
package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._
case class Foo()
class ConnHandler(conn: ActorRef) extends Actor {
def receive = {
case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
}
}
class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
import context.system
println("Listing on 127.0.0.1:9191")
IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
def receive = {
case Tcp.Connected(remote, local) =>
val handler = context.actorOf(Props(new ConnHandler(sender)))
sender ! Tcp.Register(handler)
conns.append(handler)
}
}
object Main {
def main(args: Array[String]) {
implicit val system = ActorSystem("Test")
val conns = new ArrayBuffer[ActorRef]()
val server = system.actorOf(Props(new Server(conns)))
while (true) {
println(s"Sending some foos")
for (c <- conns) c ! Foo()
Thread.sleep(1000)
}
}
}
It binds to localhost:9191 and accepts multiple connections, adding the connection handlers to a global array and periodically sending the string "foo"
to each connection. Now when I try to connect with more than one client simultaneously, only the first one gets the "foo"s. When I open a second connection, it doesn't get sent any foo's, rather I get the following type of log message:
Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
I understand that this would mean that the target actor to which we try to send the Tcp.Write
command is no longer accepting messages. But why is that? Can you help me understand the underlying issue? How can I make this work?
Upvotes: 3
Views: 2031
Reputation: 15472
There are two issues with the code presented above:
Before I elaborate, please consider reading the docs, here and here, this is all covered there.
ArrayBuffer is not thread-safe, but you pass it from the main routine to different actors who then independently (concurrently) modify it. This will lead to the loss of updates or to the corruption of the data structure itself. On the other hand, without proper synchronization it is not guaranteed that the main thread will ever see the modifications, since the compiler could in principle determine that the buffer does not change within the while
loop and optimize the code accordingly.
Instead of relying upon shared mutable state, actors only send messages. In this case the solution is to lift the while
loop into an actor (but scheduling a message to self
after a second instead of the blocking Thread.sleep(1000)
call). The connection handlers then only need to be passed the ActorRef
for this foo
sender actor, they’ll send it a message to register themselves and then that actor keeps the list of active connections inside it encapsulated scope. This has the benefit that you can use DeathWatch to also remove the connections when they terminate.
The problematic code: Props(new ConnHandler(sender))
Props are constructed from an actor factory, which is taken as a by-name argument in this case; the whole new
expression is evaluated at a later time, whenever such an actor is being initialized—possibly on a different thread. This means that sender
is also evaluated later, from that context of execution, and therefore it will likely be deadLetters
(if the parent actor is not currently running—and if it is, sender
will likely point to the wrong actor altogether).
The solution here is documented here.
Upvotes: 5