MV23
MV23

Reputation: 295

Updating an Array via Scala Parallel Collections

I have this array of HashMap defined as below

var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]

Now, I have a parallel collection of 300 elements

val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
    // Do Some Computation and get a HashMap
    var distinctElementsMap: HashMap[String, Int] = //Some Value
    //This line might result in Concurrent Access Exception
    distinctElementsDefinitionMap.update(i, distinctElementsMap)
})

I am now running a computation intensive task within a foreach loop on the columnArray defined above. After the computation is complete, I would like each of the threads to update a particular entry of the distinctElementsDefinitionMap array. Each thread would update only particular index value, unique to the thread executing it. I want to know if this updation of an entry of the array is safe with multiple threads possibly writing to it at the same time? If not is there a synchronized way of doing it so it's thread-safe? Thank You!

Update: It appears this is really not the safe way to do it. I am getting a java.util.ConcurrentModificationException Any tips on how to avoid this whilst using the parallel collections.

Upvotes: 0

Views: 1189

Answers (1)

om-nom-nom
om-nom-nom

Reputation: 62835

Use .groupBy operation, as far as I can judge it is parallelized (unlike some other methods, such as .sorted)

case class Row(a: String, b: String, c: String)
val data = Vector(
  Row("foo", "", ""), 
  Row("bar", "", ""), 
  Row("foo", "", "")
)

data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))

Hope you got the idea.

Alternatively, if your RAM allows you, parallelize processing over each column, not row, it has to be waaaay more efficient than your current approach (less contention).

val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column => 
  data.groupBy(row => row(column))
}.seq 

Though you likely will have memory problems even with the single column (8M rows might be quite a lot).

Upvotes: 0

Related Questions