Aserian
Aserian

Reputation: 1117

Scala understanding memory usage with parallel collections

I am pretty new to Scala (loving the language) and have been dealing with reading streams/lazy lists lately.

I was messing around with parallelism as I had a task that was taking very long to do synchronously with a foldLeft (but didn't need to be done synchronously).

So I looked into Parallel Collections to achieve my goal.

Originally, I tried to apply par to the stream directly. When I did this, memory usage ballooned and the program came to a grinding halt. So from this, it tells me that the parallel collections need to read all the data into memory. I thought perhaps it might just continue to grab elements from the stream and process in parallel, but that was not the case.


Question 1: Why do Parallel Collections require reading the entire collection into memory? Is this indeed what is happening, or have I misunderstood my above scenario?


So, having figured that the memory usage was the issue, I decided to break the collection into chunks using sliding, then use par on these, then reduce the results afterwards. The result was beautiful, the code ran ~5x faster, but the best part was I didn't need to worry about concurrency at all.

I tested a few values and ended up using a sliding window of 150 elements, which seemed to balance nicely where there were optimal speeds before it started losing speed (perhaps due to the time that it took to reduce the results?)

However, having done this - the memory usage still spikes up by ~2 gigs. Even when reducing the sliding window to be 30 elements, it uses the same memory.


Question 2: Why does the memory spike up this much when using this method, even when reducing the size of the sliding window?


Thanks for your time!

Upvotes: 2

Views: 157

Answers (1)

Dima
Dima

Reputation: 40500

First, you probably want grouped not sliding (the latter is ... well ... sliding, so you end up processing the same element many times).

Also, if you are trying to avoid having everything loaded into memory, you want Iterator, not Stream or LazyList. The latter is lazy, but it still holds everything that has been loaded, so by the time you are done with it everything is in memory.

Something like this perhaps:

def parallelize[T](
  batchSize: Int, 
  input: Iterator[T]
)(process: T => Unit)(implicit ec: ExecutionContext): Future[Unit] = 
  Future.traverse(input.grouped(batchSize)) { group => 
     Future { group.foreach(process) }
  }.map(_ => ())

Upvotes: 0

Related Questions