Intentio
Intentio

Reputation: 72

Scala future and its callback works in the same execution context

I call def activateReward by Akka actors and execution OracleClient.rewardActivate(user) sometimes is very slow (the database is outside of my responsibility and belongs to another company).

When database is slow the thread pool is exhausted and can not effectively allocate more threads to run callbacks future.onComplete because callbacks and futures works in the same execution context.

Please advise how to execute code in the callback asynchronously from threads which allocated for futures OracleClient.rewardActivate(user)

class RewardActivatorHelper {

  private implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)
    def execute(runnable: Runnable) {threadPool.submit(runnable)}
    def reportFailure(t: Throwable) {throw t}
  }

  case class FutureResult(spStart:Long, spFinish:Long)

  def activateReward(msg:Msg, time:Long):Unit = {
    msg.users.foreach {
      user =>
        val future:Future[FutureResult] = Future {
          val (spStart, spFinish) = OracleClient.rewardActivate(user)
          FutureResult(spStart, spFinish)
        }

        future.onComplete {
          case Success(futureResult:FutureResult) =>
            futureResult match {
              case res:FutureResult => Logger.writeToLog(Logger.LogLevel.DEBUG,s"started:${res.spStart}finished:${res.spFinish}")
              case _ => Logger.writeToLog(Logger.LogLevel.DEBUG, "some error")
            }

          case Failure(e:Throwable) => Logger.writeToLog(Logger.LogLevel.DEBUG, e.getMessage)    
        }
    }
  }
}

Upvotes: 1

Views: 1098

Answers (1)

Rikard
Rikard

Reputation: 692

You can specify the execution context explicitly instead of implicitly for the onComplete callback by doing something along these lines:

import java.util.concurrent.Executors
import scala.concurrent.duration.Duration

object Example extends App {
  import scala.concurrent._

  private implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)
    def execute(runnable: Runnable) {threadPool.submit(runnable)}
    def reportFailure(t: Throwable) {throw t}
  }

  val f = Future {
    println("from future")
  }

  f.onComplete { _ =>
    println("I'm done.")
  }(scala.concurrent.ExecutionContext.Implicits.global)

  Await.result(f, Duration.Inf)
}

This will of course not solve the underlying problem of a database not keeping up, but might be good to know anyway.

To clarify: I let the onComplete callback be handled by the standard global execution context. You might want to create a separate one.

Upvotes: 5

Related Questions