Reputation: 25976
Is it possible to apply functional programming to Scala streams such that the stream is processed sequentially, but the already processed part of the stream can be garbage collected?
For example, I define a Stream
that contains the numbers from start
to end
:
def fromToStream(start: Int, end: Int) : Stream[Int] = {
if (end < start) Stream.empty
else start #:: fromToStream(start+1, end)
}
If I sum up the values in a functional style:
println(fromToStream(1,10000000).reduceLeft(_+_))
I get an OutOfMemoryError
- perhaps since the stackframe of the call to reduceLeft
holds a reference to the head of the stream. But if I do this in iterative style, it works:
var sum = 0
for (i <- fromToStream(1,10000000)) {
sum += i
}
Is there a way to do this in a functional style without getting an OutOfMemory
?
UPDATE: This was a bug in scala that is fixed now. So this is more or less out of date now.
Upvotes: 21
Views: 3630
Reputation: 32335
Yes, you can. The trick is to use tail recursive methods, so that the local stack frame contains the only reference to the Stream
instance. Since the method is tail-recursive, the local reference to the previous Stream
head will be erased once it recursively calls itself, thus enabling the GC to collect the start of the Stream
as you go.
Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.
scala> import collection.immutable.Stream
import collection.immutable.Stream
scala> import annotation.tailrec
import annotation.tailrec
scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int
scala> last(Stream.range(0, 100000000))
res2: Int = 99999999
Also, you must ensure that the thing you pass to the method last
above has only one reference on the stack. If you store a Stream
into a local variable or value, it will not be garbage collected when you call the last
method, since its argument is not the only reference left to Stream
. The code below runs out of memory.
scala> val s = Stream.range(0, 100000000)
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)
scala> last(s)
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)
at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)
at sun.misc.URLClassPath.getResource(URLClassPath.java:169)
at java.net.URLClassLoader$1.run(URLClassLoader.java:194)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)
at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)
at scala.util.control.Exception$Catch.apply(Exception.scala:80)
at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)
at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)
at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)
at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)
at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)
at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)
To summarize:
Stream
EDIT:
Note that this also works and does not result in an out of memory error:
scala> def s = Stream.range(0, 100000000)
s: scala.collection.immutable.Stream[Int]
scala> last(s)
res1: Int = 99999999
EDIT2:
And in the case of reduceLeft
that you require, you would have to define a helper method with an accumulator argument for the result.
For reduceLeft, you need an accumulator argument, which you can set to a certain value using default arguments. A simplified example:
scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int
scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032
Upvotes: 13
Reputation: 25976
As it turns out, this is a bug in the current implementation of reduceLeft. The problem is that reduceLeft calls foldLeft, and thus the stackframe of reduceLeft holds a reference to the head of the stream during the whole call. foldLeft uses tail-recursion to avoid this problem. Compare:
(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)
These are semantically equivalent. In Scala version 2.8.0 the call to foldLeft works, but the call to reduceLeft throws an OutOfMemory. If reduceLeft would do its own work, this problem would not occur.
Upvotes: 2
Reputation: 41646
When I started learning about Stream
I thought this was cool. Then I realized Iterator
is what I want to use nearly all the time.
In case you do need Stream
but want to make reduceLeft
work:
fromToStream(1,10000000).toIterator.reduceLeft(_ + _)
If you try the line above, it will garbage collect just fine. I have found that using Stream is tricky as it's easy to hold on to the head without realizing it. Sometimes the standard lib will hold on to it for you - in very subtle ways.
Upvotes: 19