Reputation: 1712
Basically I'm running two futures queries on cassandra, then I need to do some computation and return the value(an average of values).
Here is my code:
object TestWrapFuture {
def main(args: Array[String]) {
val category = 5392
ExtensiveComputation.average(category).onComplete {
case Success(s) => println(s)
case Failure(f) => throw new Exception(f)
}
}
}
class ExtensiveComputation {
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
val average = volume.sum / volume.length
average
}
}
}
object ExtensiveComputation extends ExtensiveComputation
So what is the problem?
The skus.foreach are appending the result value in a ListBuffer. Since everything is async, when I try to obtain the result in my main, I got an error saying I can't divide by zero.
Indeed, since my Sku.findSkusByProduct returns a Future, when I try to compute the average, the volume is empty.
Should I block anything prior this computation, or should I do anything else?
EDIT
Well, I tried to block like this:
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
val blocked = productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
}
Await.result(blocked, Duration.Inf)
val average = volume.sum / volume.length
Future.successful(average)
}
Then I got two different results from this piece of code:
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
1 - When there are just a few like 50 to be looked up on cassandra, it just runs and gives me the result
2 - When there are many like 1000, it gives me
java.lang.ArithmeticException: / by zero
EDIT 2
I tried this code as @Olivier Michallat proposed
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) findBlocking(prod._1)
volume.sum / volume.length
}
}
def findBlocking(productId: Long) = {
val future = Sku.findSkusByProductId(productId).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
Await.result(future, Duration.Inf)
}
And the following as @kolmar proposed:
def average(categoryId: Int): Future[Int] = {
for {
prods <- Product.findProductsByCategory(categoryId)
filtered = prods.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
volumes.sum / volumes.size
}
}
Both works with a few skus to find like 50, but both fails with many skus to find like 1000 throwing ArithmeticException: / by zero
It seems that it could not compute everything before returning the future...
Upvotes: 2
Views: 401
Reputation: 14224
Since you have to call a function that returns a Future
on a sequence of arguments, it's better to use Future.traverse
for that.
For example:
object ExtensiveComputation {
def average(categoryId: Int): Future[Double] = {
for {
products <- Product.findProductsByCategory(categoryId)
filtered = products.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.map { sku =>
sku.height.get * sku.width.get * sku.length.get }
volumes.sum / volumes.size
}
}
}
Upvotes: 2
Reputation: 2312
You need to wait until all the futures generated by findSkusByProductId
have completed before you compute the average. So accumulate all these futures in a Seq
, call Future.sequence
on it to get a Future[Seq]
, then map that future to a function that computes the average. Then replace productsByCategory.map
with a flatMap
.
Upvotes: 3