Reputation: 1080
I am trying to use a thread pool to make blocking requests. The problem is, each request is blocking the whole pool and items are process sequentially. Not sure if this is even possible. Somebody please help
city-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 100
}
And Java
Props props = Props.create(CityDataProcessorActor.class, psRespHolder).withDispatcher("akka.actor.city-dispatcher");
SmallestMailboxPool pool = new SmallestMailboxPool(10);
ActorRef cityRequestActorPool = actorSystem.actorOf(pool.props(props), "city-request-route");
for (String city : citiesArray) {
Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout);
Object results = Await.result(future, duration);
log.info(results.toString());
}
Upvotes: 1
Views: 461
Reputation: 595
As @Mon Calamari mentioned
Object results = Await.result(future, duration);
is a blocking call. you can try future with callback
future onComplete{
case Success()=> println(result)
case Failure()=> println("some error")
}
Upvotes: 1
Reputation: 875
Mon Calamari's comment is exactly correct. Here's an implementation. It will create a List of Futures as you create them. Then it blocks on the collected Futures sequentially to log each one. The awaits should become trivial as the iteration progresses, providing later Futures have completed in similar time.
....
Array<Future<Object>> futures = new ArrayList<>();
for (String city : citiesArray) {
Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout);
futures.add(future);
}
for (<Future<Object>> f :futures){
Object results = Await.result(f, duration);
log.info(results.toString());
}
Upvotes: 0