Kevin
Kevin

Reputation: 7946

Waiting for multiple results in Akka

What is the proper way to wait for the result of multiple actors in Akka?

The Principles of Reactive Programming Coursera course had an exercise with a replicated key-value store. Without going into the details of the assignment, it required waiting on the acknowledgement of multiple actors before it could indicate the replication was complete.

I implemented the assignment using a mutable map containing the outstanding requests, but I felt the solution had a 'bad smell'. I hoped there was a better way to implement what seems like a common scenario.

In an attempt to uphold the classes' honor code by withholding my solution to the exercise, I have an abstract use case that describes a similar problem.

An invoice line item needs to calculate its tax liability. The tax liability is combination of all the taxes applied to the line item across multiple taxing authorities (e.g., federal, state, police district). If each taxing authority was an actor capable of determining the tax liability of the line item, the line item would need all actors to report before it could continue report the overall tax liability. What is the best/right way to accomplish this scenario in Akka?

Upvotes: 7

Views: 4332

Answers (4)

SourceCodeBot
SourceCodeBot

Reputation: 175

my experience with streams in this case work fine.

I start a Source with the ActorRefs, then send msg with ask through mapAsync to ActorRefs and collect the responses to Seq.

val f = Source(workers)
  .mapAsync(USED_THREAD_COUNT)
    (actorRef => (actorRef ? QueryState).mapTo[StateResponse]))
  .runWith(Sink.seq)
f onComplete { responses => 
  // validate and work with responses
}

I hope it will help you.

Upvotes: 1

cmbaxter
cmbaxter

Reputation: 35443

Here is a simplified example of what I believe you are looking for. It shows how a master like actor spawns some child workers and then waits for all of their responses, handling the situation where a timeout can occur waiting for results. The solution shows how to wait for an initial request and then switch over to a new receive function when waiting for the responses. It also shows how to propagate state into the waiting receive function to avoid having to have explicit mutable state at the instance level.

object TaxCalculator {
  sealed trait TaxType
  case object StateTax extends TaxType
  case object FederalTax extends TaxType
  case object PoliceDistrictTax extends TaxType
  val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

  case class GetTaxAmount(grossEarnings:Double)
  case class TaxResult(taxType:TaxType, amount:Double)  

  case class TotalTaxResult(taxAmount:Double)
  case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
 import TaxCalculator._
 import context._
 import concurrent.duration._

  def receive =  waitingForRequest

  def waitingForRequest:Receive = {
    case gta:GetTaxAmount =>
      val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
      children foreach (_ ! gta)
      setReceiveTimeout(2 seconds)
      become(waitingForResponses(sender, AllTaxTypes))
  }

  def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
    case TaxResult(tt, amount) =>
      val newTaxes = taxes ++ Map(tt -> amount)
      if (newTaxes.keySet == expectedTypes){
        respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
        context stop self
      }
      else{
        become(waitingForResponses(respondTo, expectedTypes, newTaxes))
      }

    case ReceiveTimeout =>
      respondTo ! TaxCalculationTimeout
      context stop self
  }

  def propsFor(taxType:TaxType) = taxType match{
    case StateTax => Props[StateTaxCalculator]
    case FederalTax => Props[FederalTaxCalculator]
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
  }  
}

trait TaxCalculatingActor extends Actor{  
  import TaxCalculator._
  val taxType:TaxType
  val percentage:Double

  def receive = {
    case GetTaxAmount(earnings) => 
      val tax = earnings * percentage
      sender ! TaxResult(taxType, tax)
  }
}

class FederalTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.FederalTax
  val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.StateTax
  val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.PoliceDistrictTax
  val percentage = 0.05
}

Then you could test this out with the following code:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
  case util.Success(TotalTaxResult(amount)) =>
    println(s"Got tax total of $amount")
  case util.Success(TaxCalculationTimeout) =>
    println("Got timeout calculating tax")
  case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}")
}

Upvotes: 5

Michael Nash
Michael Nash

Reputation: 103

As prior answer have suggested, you may find the ability to compose futures helpful in this case - the best description of Futures (and Promises, which are somewhat related) I know of is here: http://docs.scala-lang.org/overviews/core/futures.html

This may help explain the ways composable futures could answer the need, perhaps more cleanly than actors, or in combination with actors.

Upvotes: 2

almendar
almendar

Reputation: 1813

This is a very common problem in Akka. You have multiple actors that will do the job for you and you need to combine them.

Solution proposed by Jammie Allen in his book "Effective Akka" (it was about getting bank account balance from various types of accounts) is that you spawn an actor that will spawn multiple actors that will do the job (e.g. calculate you tax). And it will wait for all of them to answer.

One catch that you should not use ask but insted tell.

When you spaw your multiple actors (e.g. FederalTaxactor, StateTaxActor...) you send them a message with the data they need to process. Then you know how many answers you need to collect. With every response you check if all responses are there. If not you wait.

The problem is that you might wait forever if any of the actors fail. So you schedule a timeout message to yourself. If not all answers are there you return that the operation did not complete successfully.

Akka has a special utility for scheduling a timeout to yourself available as a nice helper.

Upvotes: 2

Related Questions