YoK
YoK

Reputation: 1636

Akka connection abort

I have strange behavior with Akka TCP IO, the problem is connection reset, due to explicit call of TcpMessage.abort() from handler or Terminating handler. Peer did not receive Tcp.ConnectionClosed event. Example:

Handler onReceive

@Override
public void onReceive(Object msg) throws Exception {
    if (msg instanceof Tcp.ConnectionClosed) {
        log.info("Server ConnectionClosed: {}", msg);
        getContext().stop(getSelf());
    } else if (msg instanceof Tcp.Received){
        log.info("Aborting connection");
        getSender().tell(TcpMessage.abort(), getSelf());
    }
}

Test code

new JavaTestKit(system) {{
     getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
     Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

     ActorRef client = getSystem().actorOf(
     Props.create(Client.class, bound.localAddress(), getRef()), "Client");
     watch(client);
     expectMsgClass(Tcp.Connected.class);
     client.tell(new Message.Write(), getRef());

     expectTerminated(client);
}};

Log after execution

[INFO] [10/19/2013 22:13:22.730] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:13:22.736] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:13:22.767] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:13:22.774] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted

java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: Terminated Actor[akka://actorSystem/user/Client#423174850]

If i change TcpMessage.abort() to TcpMessage.close() in handler:

[INFO] [10/19/2013 22:17:06.243] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:17:06.249] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:17:06.278] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client] Client ConnectionClosed: PeerClosed
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Closed

How server handler can signal remote client what connection is error closed? Everything works fine if i send abort from client to server.

EDIT:

Ok, found root of problem, Abort command not working only if i sent Write command before i try to abort. On both client and server. Now i need figure out why.

EDIT2:

Seems like abort message is getting stuck, if i create new client then ErrorClosed is reached first client, and sending same write command to second client execute abort which received by client as expected.

new JavaTestKit(system) {{
    getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
    Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

    ActorRef client = getSystem().actorOf(
            Props.create(Client.class, bound.localAddress(), getRef()), "Client");
    watch(client);
    expectMsgClass(Tcp.Connected.class);
    client.tell(new Message.Write(), getRef());
    expectNoMsg();

    ActorRef client2 = getSystem().actorOf(
           Props.create(Client.class, bound.localAddress(), getRef()), "Client2");
    watch(client2);
    expectMsgClass(Tcp.Connected.class);

    expectTerminated(client);

    client2.tell(new Message.Write(), getRef());

    expectTerminated(client2);
    expectNoMsg();
}};

And log:

[INFO] [10/20/2013 00:48:05.923] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/20/2013 00:48:05.929] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/20/2013 00:48:05.959] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/2#1767183113]]
[INFO] [10/20/2013 00:48:05.966] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.930] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client2] Client Connected
[INFO] [10/20/2013 00:48:08.934] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)
[INFO] [10/20/2013 00:48:08.936] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client2] Sending Write to Handler
[INFO] [10/20/2013 00:48:08.937] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/4#-598138943]]
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-7] [akka://actorSystem/user/Client2] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)

EDIT3:

To make it a little bit more clear, question is: How i can inform my remote actor handler what peer is crashed and ErrorClosed the connection (it doing it on remote end). Last ErrorClosed message is always being stuck, it did not disappear, it just sits somewhere in selector and only pushed to peer after i do another network operation (write or connect another actor to the server). This behavior only encountered if i do Write command before aborting. This behavior is OS independent; i tried on Windows and Linux machine, same result.

Upvotes: 1

Views: 1627

Answers (1)

YoK
YoK

Reputation: 1636

I did not found solid solution for this problem, but i did few things to minimize total impact.

  1. Do not explicitly use RST (this include abort command). Always close connection with FIN (close). Even in case of error.
  2. Signal connection close if actor died because of error, so connection can be properly closed.
  3. Use async rep-req data transfer mechanism for transfering large amount of data (ZMQ).

Also heartbeat actor can be implemented, but it seems like not very good idea, especialy that Akka IO selectors run heartbeat on TCP socket by themselves?

Upvotes: 1

Related Questions