Nicolas Joseph
Nicolas Joseph

Reputation: 1724

How to use the akka-http cachedHostConnectionPool inside an akka actor?

The akka http documentation mentions in the high level client request API documentation that we should not use access actor state form inside a Future in an actor.

Instead, this should be the pattern used:

import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer

class Myself extends Actor
  with ImplicitMaterializer
  with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher

  val http = Http(context.system)

  override def preStart() = {
    http.singleRequest(HttpRequest(uri = "http://akka.io"))
      .pipeTo(self)
  }

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      log.info("Got response, body: " + entity.dataBytes.runFold(ByteString(""))(_ ++ _))
    case HttpResponse(code, _, _, _) =>
      log.info("Request failed, response code: " + code)
  }

}

Should we do something similar when using cachedHostConnectionPool ?

ex:

import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer

class Myself extends Actor
  with ImplicitMaterializer
  with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher

  var state = 10
  val http = Http(context.system)
  val pool = http.cachedHostConnectionPoolTls[Int](apiEndpoint.authority.host.toString())

  override def preStart() = {
    Source.single(HttpRequest(uri = "http://akka.io") -> 42)
      .via(poolClientFlow)
      .runWith(Sink.head)
      .pipeTo(self)
  }

  def receive = {
    case (res, ref) => ref match {
      case 42 => state -= 1 // Do something with the response 
    }
  }
}

If so why do we need that ? Could not find the explanation in the doc If not, what's the correct pattern ?

Thank you

Upvotes: 1

Views: 808

Answers (1)

cmbaxter
cmbaxter

Reputation: 35453

As mentioned in my comment, you should use pipeTo to send the results of the Future back to yourself for processing if you need to mutate the internal state of the actor. If you don't do this, you run the risk of running into issues with concurrent modifications of that internal state and lose the benefit of using an actor. The Future's post completion logic will not be executed in the context of the actor's mailbox processing and therefore could be executing at the same time as a message in the mailbox leading to that potential concurrency issue. That's why if you need to mutate state after the Future completed it's advised to pipe back to self.

Now, if there is no state to manager afterwards then you don't have to use pipeTo as concurrent modification of state won't be an issue.

Upvotes: 1

Related Questions