Incerteza
Incerteza

Reputation: 34944

Sending a request to a server asynchronously involving akka

I want to make a request to a server asynchronously involving actors. Say I have 2 actors:

 class SessionRetriever extends Actor {
  import SessionRetriever._

  def receiver = {
    Get => 
      val s = getSessionIdFromServer() // 1
      sender ! Result(s)               // 2
  }

  def getSessionIdFromServer(): String = { ... } // 3
}

object SessionRetriever {
  object Get
  object Result(s: String)
}

And

class RequestSender extends Actor {
  val sActor = context actorOf Props[SessionRetriever]

  def receiver = {
    // get session id
    val sesId = sActor ! SessionRetriever.Get 
    val res = sendRequestToServer(sesId)      
    logToFile(res)                            

    context shutdown sActor       
  }


  def sendRequestToServer(sessionId: String): String = { .... } 
}

My questions:

val s = getSessionIdFromServer() // 1
sender ! Result(s)               // 2

1) getSessionIdFromServer() does a synchronous request to the server. I think it would be much better make a request asynchronous, correct? So it will return Future[String] instead of a plain String.

2) How do I make asynchronous: by using an AsyncHttpClient (if I recall correctly its name) or by wrapping its synchronous body into Future { } ?

3) Should I use blocking { } block ? If yes, then where exactly: inside its body or here val s = blocking { getSessionIdFromServer() } ?

P.S. I'd like not to use async { } and await { } at this point because they are quite high level functions and after all they are build on top on Futures.

Upvotes: 2

Views: 1252

Answers (3)

Arnaud Gourlay
Arnaud Gourlay

Reputation: 4666

To do an asynchronous call with AsyncHttpClient you could deal with the java Future via a scala Promise.

    import scala.concurrent.Future
    import com.ning.http.client.AsyncHttpClient
    import scala.concurrent.Promise
    import java.util.concurrent.Executor

    object WebClient {

      private val client = new AsyncHttpClient

      case class BadStatus(status: Int) extends RuntimeException

      def get(url: String)(implicit exec: Executor): Future[String] = {
        val f = client.prepareGet(url).execute();
        val p = Promise[String]()
        f.addListener(new Runnable {
          def run = {
            val response = f.get
            if (response.getStatusCode / 100 < 4)
              p.success(response.getResponseBodyExcerpt(131072))
            else p.failure(BadStatus(response.getStatusCode))
          }
        }, exec)
        p.future
      }

      def shutdown(): Unit = client.close()
 }

 object WebClientTest extends App {
   import scala.concurrent.ExecutionContext.Implicits.global
   WebClient get "http://www.google.com/" map println foreach (_ => WebClient.shutdown())
 }

And then deal with the future completion via a callback.

Credit for the code to the awesome reactive programming course at Coursera.

Upvotes: 1

user1484819
user1484819

Reputation: 919

you might try this non-blocking way

def receive = {
    Get => 
      //assume getSessionIdFromServer() run aysnchronize
      val f: Future[String] = getSessionIdFromServer()
      val client = sender //keep it local to use when future back
      f onComplete {
        case Success(rep) => client ! Result(rep)
        case Failure(ex) => client ! Failed(ex)
      }
 }

Upvotes: 2

vptheron
vptheron

Reputation: 7476

1) If getSessionIdFromServer() is blocking then you should execute it asynchronously from your receive function, otherwise your actor will block each time it receives a new request and will always wait until it receives a new session before processing the next request.

2) Using a Future will "move" the blocking operation to a different thread. So, your actor will not be blocked and will be able to keep processing incoming requests - that's good -, however you are still blocking a thread - not so great. Using the AsyncHttpClient is a good idea. You can explore other non-blocking httpClient, like PlayWebService.

3) I am not quite familiar with blocking so not sure I should advise anything here. From what I understand, it will tell the thread pool that the operation is blocking and that it should spawn a temporary new thread to handle it - this avoids having all your workers being blocked. Again, if you do that your actor will not blocked, but you are still blocking a thread while getting the session from the server.

To summarize: just use an async http client in getSessionIdFromServer if it is possible. Otherwise, use either Future{} or blocking.

Upvotes: 1

Related Questions