Stereo
Stereo

Reputation: 1193

Why am I getting a race condition in multi-threading scala?

I am trying to parallelise a p-norm calculation over an array.

To achieve that I try the following, I understand I can solve this differently but I am interested in understanding where the race condition is occurring,

val toSum = Array(0,1,2,3,4,5,6)

// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
  val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
  res.toInt
}

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  var acc = 0L

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      val x = new AnyRef{}
      x.synchronized {
        acc = acc + subsum
      }
    }
  }

  val split = a.size  / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.size)
  seg_one.start
  seg_two.start
  seg_one.join
  seg_two.join
  scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))

Expected output is 9.5393920142 but instead some runs give me 9.273618495495704 or even 2.23606797749979.

Any recommendations where the race condition could happen?

Upvotes: 1

Views: 652

Answers (2)

Tim
Tim

Reputation: 27356

The problem has been explained in the previous answer, but a better way to avoid this race condition and improve performance is to use an AtomicInteger

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  val acc = new AtomicInteger(0)

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      acc.getAndAdd(subsum)
    }
  }

  val split = a.length / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.length)
  seg_one.start()
  seg_two.start()
  seg_one.join()
  seg_two.join()
  scala.math.pow(acc.get, 1.0 / p)
}

Modern processors can do atomic operations without blocking which can be much faster than explicit synchronisation. In my tests this runs twice as fast as the original code (with correct placement of x)

Upvotes: 2

Rob Starling
Rob Starling

Reputation: 3908

Move val x = new AnyRef{} outside sumSegmenter (that is, into parallelpNorm) -- the problem is that each thread is using its own mutex rather than sharing one.

Upvotes: 1

Related Questions