Joe
Joe

Reputation: 1080

Akka actor pool for blocking requests

I am trying to use a thread pool to make blocking requests. The problem is, each request is blocking the whole pool and items are process sequentially. Not sure if this is even possible. Somebody please help

city-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 100
}

And Java

        Props props = Props.create(CityDataProcessorActor.class, psRespHolder).withDispatcher("akka.actor.city-dispatcher");

    SmallestMailboxPool pool = new SmallestMailboxPool(10);

    ActorRef cityRequestActorPool = actorSystem.actorOf(pool.props(props), "city-request-route");
    for (String city : citiesArray) {
        Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout);
        Object results = Await.result(future, duration);
        log.info(results.toString());
    }

Upvotes: 1

Views: 461

Answers (2)

Advika
Advika

Reputation: 595

As @Mon Calamari mentioned Object results = Await.result(future, duration); is a blocking call. you can try future with callback

future onComplete{ case Success()=> println(result) case Failure()=> println("some error") }

Upvotes: 1

WillD
WillD

Reputation: 875

Mon Calamari's comment is exactly correct. Here's an implementation. It will create a List of Futures as you create them. Then it blocks on the collected Futures sequentially to log each one. The awaits should become trivial as the iteration progresses, providing later Futures have completed in similar time.

....
Array<Future<Object>> futures = new ArrayList<>();
for (String city : citiesArray) {
    Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout);
    futures.add(future);
}

for (<Future<Object>> f :futures){
  Object results = Await.result(f, duration);
  log.info(results.toString());
}

Upvotes: 0

Related Questions