DevG
DevG

Reputation: 494

Ask Akka actor for a result only when all the messages are processed

I am trying to split a big chunk of text into multiple paragraphs and process it concurrently by calling an external API. An immutable list is updated each time the response comes from the API for the paragraph.

Once the paragraphs are processed and the list is updated, I would like to ask the Actor for the final status to be used in the next steps.

The problem with the below approach is that I would never know when all the paragraphs are processed. I need to get back the targetStore once all the paragraphs are processed and the list is final.

def main(args: Array[String]) {
    val source = Source.fromFile("input.txt")
    val extDelegator = new ExtractionDelegator()
    source.getLines().foreach(line => extDelegator.processParagraph(line))
    extDelegator.getFinalResult()

  }


case class Extract(uuid: UUID, text: String)

case class UpdateList(text: String)

case class DelegateLambda(text: String)


case class FinalResult()


class ExtractionDelegator {
 
  val system = ActorSystem("ExtractionDelegator")
  val extActor = system.actorOf(Props(classOf[ExtractorDelegateActor]).withDispatcher("fixed-thread-pool"))
  implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")

  def processParagraph(text: String) = {
    extActor ! Extract(uuid, text)

  }

  def getFinalResult(): java.util.List[String] = {
    implicit val timeout = Timeout(5 seconds)
    val askActor = system.actorOf(Props(classOf[ExtractorDelegateActor]))
    val future = askActor ? FinalResult()
    val result = Await.result(future, timeout.duration).asInstanceOf[java.util.List[String]]
    result
  }

  def shutdown(): Unit = {
    system.terminate()
  }

}


/* Extractor Delegator actor*/
class ExtractorDelegateActor extends Actor with ActorLogging {
  var targetStore:scala.collection.immutable.List[String] = scala.collection.immutable.List.empty

  def receive = {
    case Extract(uuid, text) => {
      context.actorOf(Props[ExtractProcessor].withDispatcher("fixed-thread-pool")) ! DelegateLambda(text)

    }
    case UpdateList(res) => {
      targetStore = targetStore :+ res
    }
    case FinalResult() => {
      val senderActor=sender()
      senderActor ! targetStore

    }
  }
}

/* Aggregator actor*/
class ExtractProcessor extends Actor with ActorLogging {
  def receive = {
    case DelegateLambda(text) => {
      val res =callLamdaService(text)
      sender ! UpdateList(res)
    }

  }

  def callLamdaService(text: String): String = { 
    //THis is where external API is called.  
    Thread.sleep(1000)
    result
  }
}

Upvotes: 1

Views: 279

Answers (1)

Evgeny
Evgeny

Reputation: 1770

Not sure why you want to use actors here, most simple would be to

// because you call external service, you have back async response most probably
def callLamdaService(text: String): Future[String]

and to process your text you do

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global // use you execution context here
Future.sequence(source.getLines().map(callLamdaService)).map {results =>
  // do what you want with results
}

If you still want to use actors, you can do it replacing callLamdaService to processParagraph which internally will do ask to worker actor, who returns result (so, signature for processParagraph will be def processParagraph(text: String): Future[String])

If you still want to start multiple tasks and then ask for result, then you just need to use context.become with receive(worker: Int), when you increase amount of workers for each Extract message and decrease amount of workers on each UpdateList message. You will also need to implement then delayed processing of FinalResult for the case of non-zero amount of processing workers.

Upvotes: 1

Related Questions