Reputation: 221
I can connect to a ServerSocket
using Akka I/O, but when I close the socket I'm not seeing any sort of ConnectionClosed
message returned.
The first test passes, but the second does not (with both 2.2.3 and 2.3.0-RC2):
import org.specs2.mutable._
import java.net._
import akka.actor.{Actor, ActorRef, ActorSystem}
import akka.io._
import akka.testkit.TestActorRef
object System {
implicit val system = ActorSystem("SocketListenerTests")
}
import System._
class Listener extends Actor {
val manager = IO(Tcp)
private var _last: Any = null
def last = _last
def receive = {
case a: Any => _last = a
}
def connect(addr: InetSocketAddress) { manager ! Tcp.Connect(addr) }
}
class AkkaTcpTests extends Specification {
override def is = args(sequential = true) ^ super.is
val socket = new ServerSocket(0)
val addr = new InetSocketAddress("localhost", socket.getLocalPort)
def pause { Thread.sleep(200) }
val listener = TestActorRef[Listener].underlyingActor
"Akka TCP I/O" should {
"open connections" in {
listener.connect(addr)
pause
listener.last must beAnInstanceOf[Tcp.Connected]
}
"notify when server sockets are closed" in {
socket.close
pause
listener.last must beAnInstanceOf[Tcp.ConnectionClosed]
}
}
}
[error] 'Connected(localhost/127.0.0.1:59141,/127.0.0.1:54809)' is not an instance of 'akka.io.Tcp$ConnectionClosed' (AkkaTcpTests.scala:46)
Thanks for any help.
Upvotes: 1
Views: 665
Reputation: 331
Have a look at http://doc.akka.io/docs/akka/2.2.3/scala/io-tcp.html
Your listener has to register itself with the connection before it receives any further TCP messages:
In order to activate the new connection a Register message must be sent to the connection actor, informing that one about who shall receive data from the socket. Before this step is done the connection cannot be used, and there is an internal timeout after which the connection actor will shut itself down if no Register message is received.
In the receive block of the listener add a
case c @ Connected(remote, local) =>
val connection = sender
connection ! Register(self)
// change behaviour with become, notify someone, ...
Edit: As Roland pointed out, in the given scenario one also needs to call accept()
on the ServerSocket
, which returns a Socket, and call close()
that one in order to trigger a PeerClosed
message (which is an instanceOf[Tcp.ConnectionClosed]
). If you call close()
on the ServerSocket an error is thrown as you closed the servers connection. But usually you don't have to deal with this level when using Akk IO.
Upvotes: 1
Reputation: 15472
Closing a ServerSocket
does not terminate connections which were accepted by it, you will have to obtain the Socket
by calling accept()
and then you can close that one to tear down the connection.
This is needed in addition to the answer given by Carsten.
Upvotes: 1