cool breeze
cool breeze

Reputation: 4811

Converting blocking code to using scala futures

My old code looks something like below, where all db calls blocking.

I need help converting this over to using Futures.

def getUserPoints(username: String): Option[Long]
    db.getUserPoints(username) match {
        case Some(userPoints) => Some(userPoints.total)
        case None => {
            if (db.getSomething("abc").isEmpty) {
                db.somethingElse("asdf") match {
                    case Some(pointId) => {
                        db.setPoints(pointId, username)
                        db.findPointsForUser(username)
                    }
                    case _ => None
                }
            } else {
                db.findPointsForUser(username)
            }
        }       
    }
}

My new API is below where I am returning Futures.

db.getUserPoints(username: String): Future[Option[UserPoints]]
db.getSomething(s: String): Future[Option[Long]]
db.setPoints(pointId, username): Future[Unit]
db.findPointsForUser(username): Future[Option[Long]]

How can I go about converting the above to use my new API that uses futures.

I tried using a for-compr but started to get wierd errors like Future[Nothing].

var userPointsFut: Future[Long] = for {
  userPointsOpt <- db.getUserPoints(username)
  userPoints <- userPointsOpt
} yield userPoints.total

But it gets a bit tricky with all the branching and if clauses and trying to convert it over to futures.

Upvotes: 1

Views: 349

Answers (1)

maasg
maasg

Reputation: 37435

I would argue that the first issue with this design is that the port of the blocking call to a Future should not wrap the Option type:

The blocking call: def giveMeSomethingBlocking(for:Id): Option[T] Should become: def giveMeSomethingBlocking(for:Id): Future[T] And not: def giveMeSomethingBlocking(for:Id): Future[Option[T]]

The blocking call give either a value Some(value) or None, the non-blocking Future version gives either a Success(value) or Failure(exception) which fully preserves the Option semantics in a non-blocking fashion.

With that in mind, we can model the process in question using combinators on Future. Let's see how:

First, lets refactor the API to something we can work with:

type UserPoints = Long
object db {
  def getUserPoints(username: String): Future[UserPoints] = ???
  def getSomething(s: String): Future[UserPoints] = ???
  def setPoints(pointId:UserPoints, username: String): Future[Unit] = ???
  def findPointsForUser(username: String): Future[UserPoints] = ???
}
class PointsNotFound extends Exception("bonk")
class StuffNotFound extends Exception("sthing not found")

Then, the process would look like:

def getUserPoints(username:String): Future[UserPoints] = {
  db.getUserPoints(username)
  .map(userPoints => userPoints /*.total*/)
  .recoverWith{ 
    case ex:PointsNotFound => 
    (for {
      sthingElse <- db.getSomething("abc")
      _ <- db.setPoints(sthingElse, username)
      points <- db.findPointsForUser(username)
    } yield (points))
    .recoverWith{
      case ex: StuffNotFound => db.findPointsForUser(username)
    }
  }
}

Which type-checks correctly.

Edit


Given that the API is set in stone, a way to deal with nested monadic types is to define a MonadTransformer. In simple words, let's make Future[Option[T]] a new monad, let's call it FutureO that can be composed with other of its kind. [1]

case class FutureO[+A](future: Future[Option[A]]) {
  def flatMap[B](f: A => FutureO[B])(implicit ec: ExecutionContext): FutureO[B] = {
    val newFuture = future.flatMap{
      case Some(a) => f(a).future
      case None => Future.successful(None)
    }
    FutureO(newFuture)
  }

  def map[B](f: A => B)(implicit ec: ExecutionContext): FutureO[B] = {
    FutureO(future.map(option => option map f))
  }
  def recoverWith[U >: A](pf: PartialFunction[Throwable, FutureO[U]])(implicit executor: ExecutionContext): FutureO[U] = {
    val futOtoFut: FutureO[U] => Future[Option[U]] = _.future
    FutureO(future.recoverWith(pf andThen futOtoFut))
  }

  def orElse[U >: A](other: => FutureO[U])(implicit executor: ExecutionContext): FutureO[U] = {
      FutureO(future.flatMap{
        case None => other.future
        case _ => this.future
      }) 
    }
  }

And now we can re-write our process preserving the same structure as the future-based composition.

type UserPoints = Long 
object db { 
  def getUserPoints(username: String): Future[Option[UserPoints]] = ???
  def getSomething(s: String): Future[Option[Long]] = ???
  def setPoints(pointId: UserPoints, username:String): Future[Unit] = ???
  def findPointsForUser(username: String): Future[Option[Long]] = ???
}
class PointsNotFound extends Exception("bonk")
class StuffNotFound extends Exception("sthing not found")

def getUserPoints2(username:String): Future[Option[UserPoints]] = {
  val futureOpt = FutureO(db.getUserPoints(username))
  .map(userPoints => userPoints /*.total*/)
  .orElse{ 
    (for {
      sthingElse <- FutureO(db.getSomething("abc"))
      _ <- FutureO(db.setPoints(sthingElse, username).map(_ => Some(())))
      points <- FutureO(db.findPointsForUser(username))
    } yield (points))
    .orElse{
      FutureO(db.findPointsForUser(username))
    }
  }
  futureOpt.future
}

[1] with acknowledgements to http://loicdescotte.github.io/posts/scala-compose-option-future/

Upvotes: 2

Related Questions