Lasf
Lasf

Reputation: 2582

How to implement multiple thread pools in a Play application using cats-effect IO

In my Play application, I service my requests usings cats-effect's IO, instead of Future in the controller, like this (super-simplified):

def handleServiceResult(serviceResult: ServiceResult): Result = ...

def serviceMyRequest(request: Request): IO[ServiceResult] = ...

def myAction = Action { request =>
  handleServiceResult(
    serviceMyRequest(request).unsafeRunSync()
  )
}

Requests are then processed (asynchronously) on Play's default thread pool. Now, I want to implement multiple thread pools to handle different sorts of requests. Were I using Futures, I could do this:

val myCustomExecutionContext: ExecutionContext = ...

def serviceMyRequest(request: Request): Future[ServiceResult] = ...

def myAction = Action.async { request =>
  Future(serviceMyRequest(request))(myCustomExecutionContext)
    .map(handleServiceResult)(defaultExecutionContext)
}

But I'm not using Futures, I'm using IO, and I'm not sure about the right way to go about implementing it. This looks promising, but seems a bit clunky:

def serviceMyRequest(request: Request): IO[ServiceResult] = ...

def myAction = Action { request =>
  val ioServiceResult = for {
    _ <- IO.shift(myCustomExecutionContext)
    serviceResult <- serviceMyRequest(request)
    _ <- IO.shift(defaultExecutionContext)
  } yield {
    serviceResult
  }
  handleServiceResult(ioServiceResult.unsafeRunSync())
}

Is this the right way to implement it? Is there a best practice here? Am I screwing up badly? Thanks.

Upvotes: 1

Views: 488

Answers (1)

Lasf
Lasf

Reputation: 2582

Ok, so since this doesn't seem to be well-trodden ground, this is what I ended up implementing:

trait PlayIO { self: BaseControllerHelpers =>

  implicit class IOActionBuilder[A](actionBuilder: ActionBuilder[Request, A]) {

    def io(block: Request[A] => IO[Result]): Action[A] = {
      actionBuilder.apply(block.andThen(_.unsafeRunSync()))
    }

    def io(executionContext: ExecutionContext)(block: Request[A] => IO[Result]): Action[A] = {
      val shiftedBlock = block.andThen(IO.shift(executionContext) *> _ <* IO.shift(defaultExecutionContext))
      actionBuilder.apply(shiftedBlock.andThen(_.unsafeRunSync()))
    }

  }

}

Then (using the framework from the question) if I mix PlayIO into the controller, I can do this,

val myCustomExecutionContext: ExecutionContext = ...

def handleServiceResult(serviceResult: ServiceResult): Result = ...

def serviceMyRequest(request: Request): IO[ServiceResult] = ...

def myAction = Action.io(myCustomExecutionContext) { request =>
  serviceMyRequest(request).map(handleServiceResult)
}

such that I execute the action's code block on myCustomExecutionContext and then, once complete, thread-shift back to Play's default execution context.

Update:

This is a bit more flexible:

trait PlayIO { self: BaseControllerHelpers =>

  implicit class IOActionBuilder[R[_], A](actionBuilder: ActionBuilder[R, A]) {

    def io(block: R[A] => IO[Result]): Action[A] = {
      actionBuilder.apply(block.andThen(_.unsafeRunSync()))
    }

    def io(executionContext: ExecutionContext)(block: R[A] => IO[Result]): Action[A] = {
      if (executionContext == defaultExecutionContext) io(block) else {
        val shiftedBlock = block.andThen(IO.shift(executionContext) *> _ <* IO.shift(defaultExecutionContext))
        io(shiftedBlock)
      }
    }

  }

}

Update2:

Per the comment above, this will ensure we always shift back to the default thread pool:

trait PlayIO { self: BaseControllerHelpers =>

  implicit class IOActionBuilder[R[_], A](actionBuilder: ActionBuilder[R, A]) {

    def io(block: R[A] => IO[Result]): Action[A] = {
      actionBuilder.apply(block.andThen(_.unsafeRunSync()))
    }

    def io(executionContext: ExecutionContext)(block: R[A] => IO[Result]): Action[A] = {
      if (executionContext == defaultExecutionContext) io(block) else {
        val shiftedBlock = block.andThen { ioResult =>
          IO.shift(executionContext).bracket(_ => ioResult)(_ => IO.shift(defaultExecutionContext))
        }
        io(shiftedBlock)
      }
    }

  }

}

Upvotes: 2

Related Questions