Grant
Grant

Reputation: 221

Akka 2.2.3 TCP client not reporting ConnectionClosed

I can connect to a ServerSocket using Akka I/O, but when I close the socket I'm not seeing any sort of ConnectionClosed message returned.

The first test passes, but the second does not (with both 2.2.3 and 2.3.0-RC2):

import org.specs2.mutable._

import java.net._

import akka.actor.{Actor, ActorRef, ActorSystem}
import akka.io._
import akka.testkit.TestActorRef

object System {
  implicit val system = ActorSystem("SocketListenerTests")
}
import System._

class Listener extends Actor {
  val manager = IO(Tcp)

  private var _last: Any = null
  def last = _last

  def receive = {
    case a: Any => _last = a
  }

  def connect(addr: InetSocketAddress) { manager ! Tcp.Connect(addr) }
}

class AkkaTcpTests extends Specification {
  override def is = args(sequential = true) ^ super.is  

  val socket = new ServerSocket(0)
  val addr = new InetSocketAddress("localhost", socket.getLocalPort)

  def pause { Thread.sleep(200) }

  val listener = TestActorRef[Listener].underlyingActor

  "Akka TCP I/O" should {
    "open connections" in {
      listener.connect(addr)
      pause
      listener.last must beAnInstanceOf[Tcp.Connected]
    }
    "notify when server sockets are closed" in {
      socket.close
      pause
      listener.last must beAnInstanceOf[Tcp.ConnectionClosed]
    }
  }
}

[error] 'Connected(localhost/127.0.0.1:59141,/127.0.0.1:54809)' is not an instance of 'akka.io.Tcp$ConnectionClosed' (AkkaTcpTests.scala:46)

Thanks for any help.

Upvotes: 1

Views: 665

Answers (2)

Carsten Saathoff
Carsten Saathoff

Reputation: 331

Have a look at http://doc.akka.io/docs/akka/2.2.3/scala/io-tcp.html

Your listener has to register itself with the connection before it receives any further TCP messages:

In order to activate the new connection a Register message must be sent to the connection actor, informing that one about who shall receive data from the socket. Before this step is done the connection cannot be used, and there is an internal timeout after which the connection actor will shut itself down if no Register message is received.

In the receive block of the listener add a

case c @ Connected(remote, local) =>
  val connection = sender
  connection ! Register(self)
  // change behaviour with become, notify someone, ...

Edit: As Roland pointed out, in the given scenario one also needs to call accept() on the ServerSocket, which returns a Socket, and call close() that one in order to trigger a PeerClosed message (which is an instanceOf[Tcp.ConnectionClosed]). If you call close() on the ServerSocket an error is thrown as you closed the servers connection. But usually you don't have to deal with this level when using Akk IO.

Upvotes: 1

Roland Kuhn
Roland Kuhn

Reputation: 15472

Closing a ServerSocket does not terminate connections which were accepted by it, you will have to obtain the Socket by calling accept() and then you can close that one to tear down the connection.

This is needed in addition to the answer given by Carsten.

Upvotes: 1

Related Questions