Bill Burcham
Bill Burcham

Reputation: 739

function forwarding Stream parameter to another function retains reference

I am able to write a Stream-processing function, drop(n,s), that scales to very large streams. But when I write another function, nth(n,s), that takes a stream s and forwards it to drop(n,s), it seems that nth() is "holding on to the head" of the stream. This results in an OutOfMemoryError for large n.

Here's the code:

import scala.annotation.tailrec
import scala.collection.immutable.Stream.cons

object Streams {

  def iterate[A](start: A, f: A => A): Stream[A] =
    cons(start, iterate(f(start), f))

  def inc(n:Int) = n + 1

  def naturals() =
    iterate(1, inc)

  @tailrec def drop[T](n : Int, s : Stream[T]) : Stream[T] =
    if (n <= 0 || s.isEmpty) s
    else drop(n-1, s.tail)

  // @inline didn't seem to help
  def nth[T](n : Int, s : Stream[T]) =
    drop(n,s).head

  def N = 1e7.toInt

  def main(args: Array[String]) {
    println(drop(N,naturals()).head) // works fine for large N
    println(nth(N, naturals())) // results in OutOfMemoryError for N set to 1e7.toInt and -Xmx10m
  }

}

My experience over on this Java question: why does this Java method leak—and why does inlining it fix the leak? leads me to believe that the Scala-generated code for nth() is not being aggressive enough in clearing s before calling drop(). The Clojure library trick (Java trick) (see that linked question) doesn't work here because all Scala function parameters are vals (not vars) so they can't be assigned (null).

How can I write a scalable nth() in terms of drop()?

Here is a related Scala compiler bug from 2009-2011 (reduceLeft() implemented in terms of foldLeft()): https://issues.scala-lang.org/browse/SI-2239

I can't tell from that Scala bug ticket, how they fixed it. There was a suggestion in the ticket that the only way to fix it was to duplicate the foldLeft() code in reduceLeft(). I really hope that isn't the answer.

Update: Andrey Tyukin's answer https://stackoverflow.com/a/52209383/156550 fixes it. Now I have:

// have to call by name (s) here, otherwise we hold on to head!
def nth[T](n : Int, s : => Stream[T]) =
  drop(n,s).head

And nth(n,s) scales fine.

Upvotes: 3

Views: 114

Answers (1)

Andrey Tyukin
Andrey Tyukin

Reputation: 44957

Here is a quick-&-dirty solution that takes just two extra characters:

def nth[T](n : Int, s: => Stream[T]) = drop(n,s).head

Here is what happens without the =>:

  • When the stream s is passed as argument to nth, it is a reference to an already existing value, previously created by naturals().
  • Because of the .head, the drop(n, s) has to return the stream to the stack frame of nth, therefore the stackframe of nth cannot be discarded, and so nth holds on to the reference s.
  • Because the reference to the parameter s is kept by the stack frame of nth while drop is working, all the dropped prefixes cannot actually be garbage-collected (this is because Stream guarantees that if you hold on to its head and iterate through it multiple times, it will return the same results).

Now, if you add =>, then the following happens:

  • The stack frame of nth still cannot be discarded because of .head
  • But: nth does not hold the reference to the head of the stream passed to drop. It only holds a reference to a thunk that produces the Stream.
  • The thunk itself does not take up any significant amount of memory, and it does not hold any references to the head of the created stream, therefore, the prefix of the stream can be garbage-collected.

Additional note (Dima's test case):

Note that if the thunk itself simply returns a reference to an already existing Stream, then the behavior is still the same as without =>. For example, if your inc were defined as follows:

def inc(i: Int): Int = {
  println(System.currentTimeMillis())
  i + 1
}

then invoking

val s = naturals()
nth(10, s)
nth(5, s)

would print the current time only ten times (not 15).

Upvotes: 2

Related Questions