Reputation: 5352
I know I'm doing something wrong with mutable.ListBuffer
but I can't figure out how to fix it (and a proper explanation of the issue).
I simplified the code below to reproduce the behavior.
I'm basically trying to run functions in parallel to add elements to a list as my first list get processed. I end up "losing" elements.
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global
object MyTestObject {
var listBufferOfInts = new ListBuffer[Int]() // files that are processed
def runFunction(): Int = {
listBufferOfInts = new ListBuffer[Int]()
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts ++= List(elem)
}
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
which returns:
res0: Int = 937
res1: Int = 992
res2: Int = 997
Obviously I would expect 1000
to be returned all the time. How can I fix my code to keep the "architecture" but make my ListBuffer "synchronized" ?
Upvotes: 1
Views: 2273
Reputation: 8663
I don't know what exact problem is as you said you simplified it, but still you have an obvious race condition, multiple threads modify a single mutable collection and that is very bad. As other answers pointed out you need some locking so that only one thread could modify collection at the same time. If your calculations are heavy, appending result in synchronized way to a buffer shouldn't notably affect the performance but when in doubt always measure.
But synchronization is not needed, you can do something else instead, without vars and mutable state. Let each Future
return your partial result and then merge them into a list, in fact Future.traverse
does just that.
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
def runFunction: Int = {
val inputListOfInts = 1 to 1000
val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
Future {
// some heavy calculations on i
i * 4
}
}
val listOfInts = Await.result(fut, Duration.Inf)
listOfInts.size
}
Future.traverse
already gives you an immutable list with all your results combined, no need to append them to a mutable buffer.
Needless to say, you will always get 1000
back.
@ List.fill(10000)(runFunction).exists(_ != 1000)
res18: Boolean = false
Upvotes: 4
Reputation: 5352
Changing
listBufferOfInts ++= List(elem)
to
synchronized {
listBufferOfInts ++= List(elem)
}
Make it work. Probably can become a performance issue? I'm still interested in an explanation and maybe a better way of doing things!
Upvotes: 0
Reputation: 111
I'm not sure the above shows what you are trying to do correctly. Maybe the issue is that you are actually sharing a var ListBuffer which you reinitialise within runFunction.
When I take this out I collect all the events I'm expecting correctly:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
object BrokenTestObject extends App {
var listBufferOfInts = ( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
println(s"collected ${listBufferOfInts.length} elements")
}
If you really have a synchronisation issue you can use something like the following:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
class WrappedListBuffer(val lb: ListBuffer[Int]) {
def append(i: Int) {
this.synchronized {
lb.append(i)
}
}
}
object MyTestObject extends App {
var listBufferOfInts = new WrappedListBuffer( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.lb.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
println(s"collected ${listBufferOfInts.lb.size} elements")
}
Upvotes: 1