Dr. Hans-Peter Störr
Dr. Hans-Peter Störr

Reputation: 25976

Functional processing of Scala streams without OutOfMemory errors

Is it possible to apply functional programming to Scala streams such that the stream is processed sequentially, but the already processed part of the stream can be garbage collected?

For example, I define a Stream that contains the numbers from start to end:

def fromToStream(start: Int, end: Int) : Stream[Int] = {
  if (end < start) Stream.empty
  else start #:: fromToStream(start+1, end)
}

If I sum up the values in a functional style:

println(fromToStream(1,10000000).reduceLeft(_+_))

I get an OutOfMemoryError - perhaps since the stackframe of the call to reduceLeft holds a reference to the head of the stream. But if I do this in iterative style, it works:

var sum = 0
for (i <- fromToStream(1,10000000)) {
  sum += i
}

Is there a way to do this in a functional style without getting an OutOfMemory?

UPDATE: This was a bug in scala that is fixed now. So this is more or less out of date now.

Upvotes: 21

Views: 3630

Answers (4)

axel22
axel22

Reputation: 32335

Yes, you can. The trick is to use tail recursive methods, so that the local stack frame contains the only reference to the Stream instance. Since the method is tail-recursive, the local reference to the previous Stream head will be erased once it recursively calls itself, thus enabling the GC to collect the start of the Stream as you go.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Also, you must ensure that the thing you pass to the method last above has only one reference on the stack. If you store a Stream into a local variable or value, it will not be garbage collected when you call the last method, since its argument is not the only reference left to Stream. The code below runs out of memory.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

To summarize:

  1. Use tail-recursive methods
  2. Annotate them as tail-recursive
  3. When you call them, ensure that their argument is the only reference to the Stream

EDIT:

Note that this also works and does not result in an out of memory error:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

And in the case of reduceLeft that you require, you would have to define a helper method with an accumulator argument for the result.

For reduceLeft, you need an accumulator argument, which you can set to a certain value using default arguments. A simplified example:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032

Upvotes: 13

Dr. Hans-Peter St&#246;rr
Dr. Hans-Peter St&#246;rr

Reputation: 25976

As it turns out, this is a bug in the current implementation of reduceLeft. The problem is that reduceLeft calls foldLeft, and thus the stackframe of reduceLeft holds a reference to the head of the stream during the whole call. foldLeft uses tail-recursion to avoid this problem. Compare:

(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)

These are semantically equivalent. In Scala version 2.8.0 the call to foldLeft works, but the call to reduceLeft throws an OutOfMemory. If reduceLeft would do its own work, this problem would not occur.

Upvotes: 2

huynhjl
huynhjl

Reputation: 41646

When I started learning about Stream I thought this was cool. Then I realized Iterator is what I want to use nearly all the time.

In case you do need Stream but want to make reduceLeft work:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _)

If you try the line above, it will garbage collect just fine. I have found that using Stream is tricky as it's easy to hold on to the head without realizing it. Sometimes the standard lib will hold on to it for you - in very subtle ways.

Upvotes: 19

Alexey Romanov
Alexey Romanov

Reputation: 170745

You may want to look at Scalaz's ephemeral streams.

Upvotes: 2

Related Questions