Niklas B.
Niklas B.

Reputation: 95278

Handling multiple TCP connections with Akka Actors

I'm trying to set up a simple TCP server using akka actors that should allow multiple clients to be connected simultaneously. I reduced my problem to the following simple program:

package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._

case class Foo()

class ConnHandler(conn: ActorRef) extends Actor {
  def receive = {
    case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
  }
}

class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
  import context.system
  println("Listing on 127.0.0.1:9191")
  IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
  def receive = {
    case Tcp.Connected(remote, local) =>
      val handler = context.actorOf(Props(new ConnHandler(sender)))
      sender ! Tcp.Register(handler)
      conns.append(handler)
  }
}

object Main {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("Test")
    val conns = new ArrayBuffer[ActorRef]()
    val server = system.actorOf(Props(new Server(conns)))
    while (true)  {
      println(s"Sending some foos")
      for (c <- conns) c ! Foo()
      Thread.sleep(1000)
    }
  }
}

It binds to localhost:9191 and accepts multiple connections, adding the connection handlers to a global array and periodically sending the string "foo" to each connection. Now when I try to connect with more than one client simultaneously, only the first one gets the "foo"s. When I open a second connection, it doesn't get sent any foo's, rather I get the following type of log message:

Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

I understand that this would mean that the target actor to which we try to send the Tcp.Write command is no longer accepting messages. But why is that? Can you help me understand the underlying issue? How can I make this work?

Upvotes: 3

Views: 2031

Answers (1)

Roland Kuhn
Roland Kuhn

Reputation: 15472

There are two issues with the code presented above:

  • sending mutable state in actor messages and mutating it in a non-threadsafe fashion
  • including unstable references in Props

Before I elaborate, please consider reading the docs, here and here, this is all covered there.

Mutable Messages

ArrayBuffer is not thread-safe, but you pass it from the main routine to different actors who then independently (concurrently) modify it. This will lead to the loss of updates or to the corruption of the data structure itself. On the other hand, without proper synchronization it is not guaranteed that the main thread will ever see the modifications, since the compiler could in principle determine that the buffer does not change within the while loop and optimize the code accordingly.

Instead of relying upon shared mutable state, actors only send messages. In this case the solution is to lift the while loop into an actor (but scheduling a message to self after a second instead of the blocking Thread.sleep(1000) call). The connection handlers then only need to be passed the ActorRef for this foo sender actor, they’ll send it a message to register themselves and then that actor keeps the list of active connections inside it encapsulated scope. This has the benefit that you can use DeathWatch to also remove the connections when they terminate.

Unstable References in Props

The problematic code: Props(new ConnHandler(sender))

Props are constructed from an actor factory, which is taken as a by-name argument in this case; the whole new expression is evaluated at a later time, whenever such an actor is being initialized—possibly on a different thread. This means that sender is also evaluated later, from that context of execution, and therefore it will likely be deadLetters (if the parent actor is not currently running—and if it is, sender will likely point to the wrong actor altogether).

The solution here is documented here.

Upvotes: 5

Related Questions