Sudipta Deb
Sudipta Deb

Reputation: 1050

Having trouble with inconsistent behaviour of Scala's Future

I am facing a little problem with inconsistent behaviour of Scala's Future. Below is the code I am having

package TryFuture

import scala.concurrent.Future
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

class FutureCappuccino {
  // Some type aliases, just for getting more meaningful method signatures:
  type CoffeeBeans = String
  type GroundCoffee = String
  case class Water(temperature: Int)
  type Milk = String
  type FrothedMilk = String
  type Espresso = String
  type Cappuccino = String

  // some exceptions for things that might go wrong in the individual steps
  // (we'll need some of them later, use the others when experimenting
  // with the code):
  case class GrindingException(msg: String) extends Exception(msg)
  case class FrothingException(msg: String) extends Exception(msg)
  case class WaterBoilingException(msg: String) extends Exception(msg)
  case class BrewingException(msg: String) extends Exception(msg)

  def grind(beans: CoffeeBeans): Future[GroundCoffee] = future {
    println("Start grinding with thread: " + Thread.currentThread().getId())
    //Thread.sleep(10)
    if (beans == "baked beans") throw GrindingException("are you joking?")
    println("Finished grinding..")
    s"ground coffee of $beans"
  }

  def heatWater(water: Water): Future[Water] = future {
    println("Heating the water with thread: " + Thread.currentThread().getId())
    //Thread.sleep(10)
    println("It's hot!!")
    water.copy(temperature = 85)
  }

  def frothMilk(milk: Milk): Future[FrothedMilk] = future {
    println("milk frothing system engaged! with thread: " + Thread.currentThread().getId())
    //Thread.sleep(Random.nextInt(10))
    println("shutting down milk frothing system")
    s"frothed $milk"
  }

  def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = future {
    println("happy brewing :) with thread: " + Thread.currentThread().getId())
    //Thread.sleep(Random.nextInt(10))
    println("it's brewed!")
    s"espresso"
  }

  def combine(espresso: Espresso, frothedMilk: FrothedMilk): Future[String] = future {
    println("Combine starting with thread: " + Thread.currentThread().getId())
    "Your Cappuccino is ready"
  }

  // going through these steps sequentially:
  def prepareCappuccino() = {
    val brewed = for {
      coffee <- grind("not baked beans")
      water <- heatWater(Water(25))
      brewed <- brew(coffee, water)
    } yield brewed

    brewed
  }

}

Now what I am expecting is that water will wait for grind to finish and then brew will wait for grind and heatWater to finish. But when I am running this program in my MacBook, I am getting the below o/p only Start grinding with thread: 8 and then the execution completes. It is not waiting for rest of the stuffs. Not able to find it out what I am missing here and why it is not waiting? Any help?

Upvotes: 1

Views: 108

Answers (2)

Sudipta Deb
Sudipta Deb

Reputation: 1050

Finally got the concept clear with below changes:

package TryFuture

import scala.concurrent.Future
import scala.concurrent.future
import scala.util.{Success,Failure}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

class FutureCappuccino {
  // Some type aliases, just for getting more meaningful method signatures:
  type CoffeeBeans = String
  type GroundCoffee = String
  case class Water(temperature: Int)
  type Milk = String
  type FrothedMilk = String
  type Espresso = String
  type Cappuccino = String

  // some exceptions for things that might go wrong in the individual steps
  // (we'll need some of them later, use the others when experimenting
  // with the code):
  case class GrindingException(msg: String) extends Exception(msg)
  case class FrothingException(msg: String) extends Exception(msg)
  case class WaterBoilingException(msg: String) extends Exception(msg)
  case class BrewingException(msg: String) extends Exception(msg)

  def grind(beans: CoffeeBeans): Future[GroundCoffee] = {
    println("Start grinding with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    if (beans == "baked beans") throw GrindingException("are you joking?")
    future {
      Thread.sleep(200)
      s"ground coffee of $beans"
    }
  }

  def heatWater(water: Water): Future[Water] = {
    println("Heating the water with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      water.copy(temperature = 85)
    }
  }

  def frothMilk(milk: Milk): Future[FrothedMilk] = {
    println("milk frothing system engaged! with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      s"frothed $milk"
    }
  }

  def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = {
    println("happy brewing :) with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      s"espresso"
    }
  }

  def combine(espresso: Espresso, frothedMilk: FrothedMilk): Future[String] =  {
    println("Combine starting with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      Thread.sleep(20)
      "Your Cappuccino is ready"
    }
  }

  // going through these steps sequentially:
  def prepareCappuccino() = {
    val coffees = grind("not baked beans")
    val waters = heatWater(Water(25))
    val milks = frothMilk("milk")

    val combined = for {
      coffee <- coffees
      water <- waters
      brewed <- brew(coffee, water)
      milk <- milks
      combined <- combine(brewed, milk)
    } yield combined

    combined onComplete {
      case Success(t)   => println("combined is done")
      case Failure(t)   => t
    }

    coffees onComplete {
      case Success(t)   => println("Coffee is done")
      case Failure(t)   => t
    }

    combined

  }

}

And finally ====>

val myFutureCappuccino = new FutureCappuccino()
  val myCoffee = myFutureCappuccino.prepareCappuccino
  Thread.sleep(2000)
  myCoffee onComplete{
    case Success(t) =>  println(t)
    case Failure(p) =>  println(p.getMessage())
  }

The now happy with the output:

Start grinding with thread: 1
Heating the water with thread: 1
milk frothing system engaged! with thread: 1
happy brewing :) with thread: 8
Coffee is done
Combine starting with thread: 8
combined is done
Your Cappuccino is ready

Sharing the answer here hoping it may help someone. Thanks.

Upvotes: 0

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

def grind(beans: CoffeeBeans): Future[GroundCoffee] = future {
    println("Start grinding with thread: " + Thread.currentThread().getId())

    if (beans == "baked beans") throw GrindingException("are you joking?")

    ...
}

def prepareCappuccino() = {
  val brewed = for {
    coffee <- grind("baked beans")
    water <- heatWater(Water(25))
    brewed <- brew(coffee, water)
  } yield brewed

  brewed
}

The grind method is throwing exception, causing for expression to result in a failed future.

When grind is passed "not baked beans", it works as expected.

Your main thread is quitting too soon.

Use Thread.sleep or Await.result

Upvotes: 1

Related Questions