Renkai
Renkai

Reputation: 2121

Is the given code thread safe?

@volatile var aVector = Vector(1, 2, 3)

thread one

aVector +:= getAnInt()

thread two

aVector match {
  case head +: tail =>
    doSomethingWith(head)
    aVector = tail
  case _ =>
}

JVM version: HotSpot 1.8

Scala version: 2.10.5

Upvotes: 4

Views: 264

Answers (2)

Aivean
Aivean

Reputation: 10882

Short answer: NO.

+:= is not an atomic operation, as well as deconstruction +: and assignment.

So you can have two scenarios when everything is going wrong:

  1. First thread reads the Vector, appends an element, (at this moment second thread reads the Vector, removes the element and reassigns the Vector var), first thread reassigns the var with appended Vector.

In this scenario first element of the Vector (that was appended by first thread) will be processed twice.

  1. Second thread reads the Vector and processes first element (first thread kicks in, reads vector, appends element), second thread reassigns Vector var with the value without first element.

In this case element that was appended by first thread will be lost.

There are several possible ways how to make this code thread safe:

  1. You can use java's concurrent queue (probably the best and easiest approach)
  2. If you want to go scala-way, you should consider using actors as producer-consumer
  3. You can design your own synchronization solution

Upd

Some clarifications about @volatile and operations on immutable collections. Scala annotation @volatile is basically java's volatile keyword. In this case it makes assignments of var aVector atomic and immediately visible in other threads, but it doesn't make sequence of (read - update - assign operations) atomic or synchronized.

This scala code:

@volatile var aVector = Vector(1, 2, 3)
aVector +:= 1

Compiles to this java:

public final class _$$anon$1 {
  private volatile Vector<Object> aVector = package$.MODULE$.Vector().apply(Predef$.MODULE$.wrapIntArray(new int[] { 1, 2, 3 }));

  private Vector<Object> aVector() {
    return (Vector<Object>)this.aVector;
  }

  private void aVector_$eq(final Vector<Object> x$1) {
    this.aVector = x$1;
  }

  {
    this.aVector_$eq(this.aVector().$plus$colon(BoxesRunTime.boxToInteger(1), Vector$.MODULE$.canBuildFrom()));
  }
}

As you can see, Vector gets read, updated and assigned back via non-atomic sequence of function calls. Second thread can update it in-between.

Upvotes: 8

Renkai
Renkai

Reputation: 2121

Code below shows it is not thread safe in action,but I'm still confusing about the volatile annotation.

object TryThreadSafe {
  def main(args: Array[String]) {
    @volatile var aVector = Vector(1, 2, 3)
    val thread = new Thread(new Runnable {
      override def run(): Unit = {
        aVector :+= getAnInt()
      }
    })
    thread.start()
    while (aVector.nonEmpty) {
      aVector match {
        case head +: tail =>
          doSomethingWith(head)
          aVector = tail
        case _ =>
      }
    }
    thread.join()
    while (aVector.nonEmpty) {
      aVector match {
        case head +: tail =>
          doSomethingWith(head)
          aVector = tail
        case _ =>
      }
    }
    //print 1 2 3 1 2 3 4
  }

  def getAnInt() = {
    Thread.sleep(10000)
    4
  }

  def doSomethingWith(head: Int) = {
    println(head)
  }
}

Upvotes: 0

Related Questions