Yannick Chaze
Yannick Chaze

Reputation: 576

How to handle multiple Promises in an (akka) Actor?

I have an Akka actor responsible of handling http calls. I use scala dispatch to send multiple HTTP requests over an API:

urls.foreach { u
  val service = url(u)
  val promise = Http(service OK as.String).either
  for(p <- promise)
  {
     p match
     {
       case Left(error) =>
         faultHandler(error)
       case Right(result) =>
         resultHandler(result)
     }
  }

In the resultHandlerfunction, I increment an instance variable nbOfResults and compare to the number of calls I have done.

def resultHandler(result:String)
{
  this.nbOfResults++
  ...
  if(nbOfResults == nbOfCalls)
    // Do something
}

Is it safe ? May the nbOfResultsvaraible be accessed at the same time if two calls return their results simultaneously ?

For now, I believed that the actor is more or less equivalent to a thread and therefore the callback functions are not executed concurrently. Is it correct ?

Upvotes: 5

Views: 1212

Answers (4)

idonnie
idonnie

Reputation: 1703

val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls)

// After particular call was ended    
if (nbOfResults.decrementAndGet <= 0) {
  // Do something
}

[EDIT] Removed old answer with AtomicReference CAS - while(true), compareAndSet, etc

Upvotes: 1

mravey
mravey

Reputation: 4500

Here is a variant of Alexey Romanov response using only dispatch :

//Promises will be of type Array[Promise[Either[Throwable, String]]]
val promises = urls.map { u =>
    val service = url(u)

    Http(service OK as.String).either
}

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]]
//So listPromise is now of type Promise[Array[Either[Throwable, String]]]
val listPromise = Http.promise.all(promises)

for (results <- listPromise) {
    //Here results is of type Array[Either[Throwable, String]]

    results foreach { result =>
        result match {
            Left(error) => //Handle error
            Right(response) => //Handle response
        }
    }
}

Upvotes: 3

Marius Danila
Marius Danila

Reputation: 10401

I agree with Alexey Romanov on his answer. Whatever way you choose to synchronize your http requests beware of the way your are processing the promises completion. Your intuition is correct in that concurrent access may appear on the state of the actor. The better way to handle this would be to do something like this:

def resultHandler(result: String) {
    //on completion we are sending the result to the actor who triggered the call
    //as a message
    self ! HttpComplete(result)
}

and in the actor's receive function:

def receive = {
    //PROCESS OTHER MESSAGES HERE
    case HttpComplete(result) => //do something with the result
}

This way, you make sure that processing the http results won't violate the actor's state from the exterior, but from the actor's receive loop which is the proper way to do it

Upvotes: 2

Alexey Romanov
Alexey Romanov

Reputation: 170713

There is a far better way:

val promises = urls.map {u =>
  val service = url(u)
  val promise = Http(service OK as.String).either
}

val listPromise = Future.sequence(promises)

listPromise.onComplete { whatever }

Upvotes: 2

Related Questions