Theswolf
Theswolf

Reputation: 61

Akka TCP Request Response need Thread.sleep

Good morning , I hava a problem with basic akka IO by TCP

I've a basic implemetation of Client and Server as shown on akka documentation:

Client is https://github.com/akka/akka/blob/v2.5.20/akka-docs/src/test/scala/docs/io/IODocSpec.scala#L67-L103

And Handler is [SimpleEchoHandler] (https://github.com/akka/akka/blob/v2.5.20/akka-docs/src/test/scala/docs/io/EchoServer.scala#L227-L304) but also the other act as the same way.

I've a main test method that stop on first connection to the server:

package core.september

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import core.september.fastchain.network.Client

/**
 * @author ${user.name}
 */
object App {

  class ClientHandler extends Actor with ActorLogging {
    def receive = {
      case toLog ⇒ {
        log.debug("Client received "+ toLog.toString)
      }
        //Thread.sleep(200)
    }
  }


  def main(args : Array[String]) {
val config = ConfigFactory.parseString("akka.loglevel = DEBUG")
    implicit val system = ActorSystem("EchoServer", config)

var clientHand:ActorRef  = system.actorOf(Props(classOf[ClientHandler]))
var address:InetSocketAddress = new InetSocketAddress("localhost",5080)


var ackServer = system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler],5080), "simple")
var client:ActorRef = system.actorOf(Props(classOf[Client],address,clientHand));

//Thread.sleep(200)

client ! ByteString("echo")

//Thread.sleep(200)

client ! "close"


  }

}

If I' don't comment out the two Thread.sleep after each message I can't see the output of the sent message, the output, without sleep is just:

[DEBUG] [02/07/2019 15:47:21.812] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:47:21.816] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Connection established to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:47:21.825] [EchoServer-akka.actor.default-dispatcher-3] [akka://EchoServer/user/$a] Client received Connected(localhost/127.0.0.1:5080,/127.0.0.1:54616)

I completely loose ByteString message and the "close" message. My question is why i need to put the main thread in sleep to show also other messages. With thread.sleep message are correctly logged:

[DEBUG] [02/07/2019 15:53:55.988] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:53:55.999] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Connection established to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:53:56.011] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/user/$a] Client received Connected(localhost/127.0.0.1:5080,/127.0.0.1:54712)
[DEBUG] [02/07/2019 15:53:56.157] [EchoServer-akka.actor.default-dispatcher-2] [akka://EchoServer/user/$a] Client received ByteString(101, 99, 104, 111)
[DEBUG] [02/07/2019 15:53:56.374] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/user/$a] Client received connection closed

ClientActor implementation is:

package core.september.fastchain.network

import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress

object Client {
  def props(remote: InetSocketAddress, replies: ActorRef) =
    Props(classOf[Client], remote, replies)
}

class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {

  import Tcp._
  import context.system

  import akka.io.Tcp

  /*if (listener == null) {
   listener = Tcp.get(context.system).manager
  }*/

  IO(Tcp) ! Connect(remote)

  def  receive =  {
case CommandFailed(_: Connect) ⇒
  listener ! "connect failed"
      context stop self

case c @ Connected(remote, local) ⇒
  listener ! c
  val connection = sender()
  connection ! Register(self)
  context become {
    case data: ByteString ⇒
      connection ! Write(data)
    case CommandFailed(w: Write) ⇒
      // O/S buffer was full
      listener ! "write failed"
    case Received(data) ⇒
      listener ! data
    case "close" ⇒
      connection ! Close
    case _: ConnectionClosed ⇒
      listener ! "connection closed"
      context stop self
  }
  }
}

thank you vey much.

Upvotes: 2

Views: 132

Answers (1)

Dima
Dima

Reputation: 40500

You gotta wait for the actor to process your messages before exiting the app. The easiest way is to use Akka's gracefulStop pattern:

import akka.pattern.gracefulStop
client ! ByteString("echo")
client ! "close"
Await.result(gracefulStop(client, 1 second)(system)

Upvotes: 2

Related Questions