Reputation: 695
I have the following piece of code:
//variable arrayToAccess is an array of integers
//anotherArray holds integers also
anotherArray.par.foreach{ item =>
val mathValue = mathematicalCalculation(item)
if (mathValue > arrayToAccess.last) {
//append element
arrayToAccess :+= mathValue
//sort array and store it in the same variable
arrayToAccess = arrayToAccess.sortWith((i1,i2) => i1 > i2).take(5)
}
}
I think that accessing the arrayToAccess variable in that way is not threadsafe. How can I implement the above code in a threadsafe manner? Also, can I control the level of parallelism of anotherArray.par (for instance, only use 2 cores out of 8 available) ? If not, is there a way to control it?
Upvotes: 1
Views: 227
Reputation: 40508
You are overthinking it. Just do:
arrayToAccess = anotherArray.par
.map { mathematicalCalculation _ }
.seq
.sorted
.reverse
.take(5)
It yields the same result as your code is intended to, but is thread safe.
Update if you are worried about the time sort step would take, you could just select top five in linear time instead:
val top(data: Array[Int], n: Int) = {
val queue = PriorityQueue()(Ordering[Int].reverse)
data.fold(queue) { case(q,n) =>
q.enqueue(n)
while(q.size > 5) q.dequeue
queue
}
.toArray
.sorted
.reversed
Regarding configuring the parallelism, I think, this should help: http://docs.scala-lang.org/overviews/parallel-collections/configuration
Update if you are concerned about the sorting step, you could replace it with a parallel sort or fold into a bounded priority queue in linear time, like this:
def topN(data: Array[Int], n: Int) = {
val queue = PriorityQueue()(Ordering[Int].reverse)
data.foldLeft(queue) { case (q, x) =>
q.enqueue(x)
while(q.size > n) q.dequeue
q
}.dequeueAll.reverse
Upvotes: 2