Reputation: 1193
I am trying to parallelise a p-norm calculation over an array.
To achieve that I try the following, I understand I can solve this differently but I am interested in understanding where the race condition is occurring,
val toSum = Array(0,1,2,3,4,5,6)
// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
res.toInt
}
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
var acc = 0L
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
val x = new AnyRef{}
x.synchronized {
acc = acc + subsum
}
}
}
val split = a.size / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.size)
seg_one.start
seg_two.start
seg_one.join
seg_two.join
scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))
Expected output is 9.5393920142 but instead some runs give me 9.273618495495704 or even 2.23606797749979.
Any recommendations where the race condition could happen?
Upvotes: 1
Views: 652
Reputation: 27356
The problem has been explained in the previous answer, but a better way to avoid this race condition and improve performance is to use an AtomicInteger
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
val acc = new AtomicInteger(0)
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
acc.getAndAdd(subsum)
}
}
val split = a.length / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.length)
seg_one.start()
seg_two.start()
seg_one.join()
seg_two.join()
scala.math.pow(acc.get, 1.0 / p)
}
Modern processors can do atomic operations without blocking which can be much faster than explicit synchronisation. In my tests this runs twice as fast as the original code (with correct placement of x
)
Upvotes: 2
Reputation: 3908
Move val x = new AnyRef{}
outside sumSegmenter
(that is, into parallelpNorm
) -- the problem is that each thread is using its own mutex rather than sharing one.
Upvotes: 1