Reputation: 127
I understand how to make a message based non-blocking application in akka, and can easily mock up examples that perform concurrent operations and pass back the aggregated results in a message. Where I have difficulty is understanding what my non-blocking options are when my application has to respond to an HTTP request. The goal is to receive a request and immediately hand it over to a local or remote actor to do the work, which in turn will hand it off to get a result that could take some time. Unfortunatly under this model, I don't understand how I could express this with a non-blocking series of "tells" rather than blocking "asks". If at any point in the chain I use a tell, I no longer have a future to use as the eventual response content (required by the http framework interface which in this case is finagle - but that is not important). I understand the request is on its own thread, and my example is quite contrived, but just trying to understand my design options.
In summary, If my contrived example below can be reworked to block less I very much love to understand how. This is my first use of akka since some light exploration a year+ ago, and in every article, document, and talk I have viewed says not to block for services.
Conceptual answers may be helpful but may also be the same as what I have already read. Working/Editing my example would likely be key to my understanding of the exact problem I am attempting to solve. If the current example is generally what needs to be done that confirmation is helpful too, so I don't search for magic that does not exist.
Note The following aliases: import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait}
object Server {
val system = ActorSystem("Example-System")
implicit val timeout = Timeout(1 seconds)
implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
val promise = TwitterPromise[T]
scFuture onComplete {
case Success(result) ⇒ promise.setValue(result)
case Failure(failure) ⇒ promise.setException(failure)
}
promise
}
val service = new Service[HttpRequest, HttpResponse] {
def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
case "/a/b/c" =>
val w1 = system.actorOf(Props(new Worker1))
val r = w1 ? "take work"
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
response
}
}
//val server = Http.serve(":8080", service); TwitterAwait.ready(server)
class Worker1 extends Actor with ActorLogging {
def receive = {
case "take work" =>
val w2 = context.actorOf(Props(new Worker2))
pipe (w2 ? "do work") to sender
}
}
class Worker2 extends Actor with ActorLogging {
def receive = {
case "do work" =>
//Long operation...
sender ! "The Work"
}
}
def main(args: Array[String]) {
val r = service.apply(
com.twitter.finagle.http.Request("/a/b/c")
)
println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
}
}
Thanks in advance for any guidance offered!
Upvotes: 8
Views: 1803
Reputation: 139028
You can avoid sending a future as a message by using the pipe pattern—i.e., in Worker1
you'd write:
pipe(w2 ? "do work") to sender
Instead of:
sender ! (w2 ? "do work")
Now r
will be a Future[String]
instead of a Future[Future[String]]
.
Update: the pipe
solution above is a general way to avoid having your actor respond with a future. As Viktor points out in a comment below, in this case you can take your Worker1
out of the loop entirely by telling Worker2
to respond directly to the actor that it (Worker1
) got the message from:
w2.tell("do work", sender)
This won't be an option if Worker1
is responsible for operating on the response from Worker2
in some way (by using map
on w2 ? "do work"
, combining multiple futures with flatMap
or a for
-comprehension, etc.), but if that's not necessary, this version is cleaner and more efficient.
That kills one Await.result
. You can get rid of the other by writing something like the following:
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
Now you just need to turn this Future
into a TwitterFuture
. I can't tell you off the top of my head exactly how to do this, but it should be fairly trivial, and definitely doesn't require blocking.
Upvotes: 5
Reputation: 35443
You definitely don't have to block at all here. First, update your import for the twitter stuff to:
import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise}
You will need the twitter Promise
as that's the impl of Future
you will return from the apply
method. Then, follow what Travis Brown said in his answer so your actor is responding in such a way that you do not have nested futures. Once you do that, you should be able to change your apply
method to something like this:
def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
case "/a/b/c" =>
val w1 = system.actorOf(Props(new Worker1))
val r = (w1 ? "take work").mapTo[String]
val prom = new TwitterPromise[HttpResponse]
r.map(toResponse) onComplete{
case Success(resp) => prom.setValue(resp)
case Failure(ex) => prom.setException(ex)
}
prom
}
def toResponse(c:String):HttpResponse = {
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
This probably needs a little more work. I didn't set it up in my IDE, so I can't guarantee you it compiles, but I believe the idea to be sound. What you return from the apply
method is a TwitterFuture
that is not yet completed. It will be completed when the future from the actor ask (?) is done and that's happing via a non-blocking onComplete
callback.
Upvotes: 0