Reputation: 4583
I want to create a thread-safe container that uses a Scala Map as a backing store. Rather than expose the user to the underlying Map, I would rather only expose a subset of its methods.
Example might look something like the following...
class MyContainer[A] {
def add(thing: A): Unit = {
backingStore = backingStore + (thing.uuid -> thing)
}
def filter(p: A => Boolean): Option[Iterable[A]] = {
val filteredThings = backingStore.values.filter(p)
if (filteredThings.isEmpty) None else Some(filteredThings)
}
def remove(uuid: UUID): Option[A] = backingStore.get(uuid) match {
case optionalThing @ Some(thing) =>
backingStore = backingStore - uuid; optionalThing
case None => None
}
@ volatile private[this] var backingStore = immutable.HashMap.empty[UUID, A]
}
...I suspect that even though the underlying backing store is immutable and its reference is volatile
, the container is not thread-safe.
Suppose that I have two separate threads running with access to an instance of the above container. Thread 1 filters the underlying collection and gets some results; at the same time Thread 2 removes an item. The results that thread one has might contain a reference to the item that Thread 2 removed? There might be other problems.
Am I correct that the above implementation is not thread-safe? What would be the most idiomatic way to make the above thread-safe using Scala?
Edit: I would prefer to avoid blocking and synchronization if possible. If blocking/synchronization must be used then is the volatile reference needed? And what would be the point of the immutable collection? Couldn't I just as well use a mutable collection?
Upvotes: 4
Views: 521
Reputation: 31754
Am I correct that the above implementation is not thread-safe?
Yes. It is not thread safe. But it does have the right memory visibility semantics.
For simplicity you could make it thread safe by:
class MyContainer[A <: {def uuid: UUID}] {
def add(thing: A): Unit = this.synchronized{
backingStore = backingStore + (thing.uuid -> thing)
}
def filter(p: A => Boolean): Option[Iterable[A]] = this.synchronized{
val filteredThings = backingStore.values.filter(p)
if (filteredThings.isEmpty) None else Some(filteredThings)
}
def remove(uuid: UUID): Option[A] = this.synchronized{
backingStore.get(uuid) match {
case optionalThing @ Some(thing) =>
backingStore = backingStore - uuid; optionalThing
case None => None
}
}
import scala.collection.immutable.HashMap
private[this] var backingStore = HashMap.empty[UUID, A]
}
Upvotes: 1
Reputation: 546
You are using a copy-on-write approach, so your problem of a concurrent read and write is that they are not strictly ordered, but that's not really a problem: it's simply a timing issue in that if A is writing while B is reading there is no guarantee about whether A will see B's edits.
Your real problem is when you have C and D writing simultaneously: then they can both read the same starting map, update their own copies and then write only their own edits. Whoever writes first will have their changes overwritten.
Consider a starting map containing (A,B), and threads C and D adding entries 'C' and 'D' trespectively, while threads E anf F read the map; all this happenning concurrently. One possible reuslt is:
C reads map (A,B)
D reads map (A,B)
C writes map (A,B,C)
E reads map (A,B,C)
D writes map (A, B, D)
F reads map (A, B, D)
The 'C' entry appeared trnasiently and was then lost forever.
The only way to reliably sequence the writes is to ensure it is never entered concurrently. Either with a synchronize locke nforce single entry the write block or ensure it is serialised by using a single Akka actor to perform updates.
You need to synchronize reads also if you care about ordering of reads vs writes, but if you have multiple threads accessing this, that's unlikely to be a real concern.
Upvotes: 3