jeffreyveon
jeffreyveon

Reputation: 13830

Scala - grouping on an ordered iterator lazily

I have an Iterator[Record] which is ordered on record.id this way:

record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2

Records of a specific ID could occur a large number of times, so I want to write a function that takes this iterator as input, and returns an Iterator[Iterator[Record]] output in a lazy manner.

I was able to come up with the following, but it fails on StackOverflowError after 500K records or so:

def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
    var iter = iterO
    def hasNext = iter.hasNext

    def next() = {
      val first = iter.next()
      val firstValue = func(first)
      val (i1, i2) = iter.span(el => func(el) == firstValue)
      iter = i2
      Iterator(first) ++ i1
    }
  }

What am I doing wrong?

Upvotes: 4

Views: 1258

Answers (2)

sashaostr
sashaostr

Reputation: 655

Inspired by chopBy implemented by @Odomontois here is a chopBy I implemented for Iterator. Of course each bulk should fit allocated memory. It doesn't looks very elegant but it seems to work :)

implicit class IteratorChopOps[A](toChopIter: Iterator[A]) {

 def chopBy[U](f: A => U) = new Iterator[Traversable[A]] {
  var next_el: Option[A] = None
  @tailrec
  private def accum(acc: List[A]): List[A] = {
    next_el = None
    val new_acc = hasNext match {
      case true =>
        val next = toChopIter.next()
        acc match {
          case Nil =>
            acc :+ next
          case _ MatchTail t if (f(t) == f(next)) =>
            acc :+ next
          case _ =>
            next_el = Some(next)
            acc
        }
      case false =>
        next_el = None
        return acc
    }

    next_el match{
      case Some(_) =>
        new_acc
      case None => accum(new_acc)
    }
  }

  def hasNext = {
    toChopIter.hasNext || next_el.isDefined
  }
  def next: Traversable[A] = accum(next_el.toList)
}
}

And here is an extractor for matching tail:

object MatchTail {
  def unapply[A] (l: Traversable[A]) = Some( (l.init, l.last) )
}

Upvotes: 0

Odomontois
Odomontois

Reputation: 16328

Trouble here is that each Iterator.span call makes another stacked closure for trailing iterator, and without any trampolining it's very easy to overflow.

Actually I dont think there is an implementation, which is not memoizing elements of prefix iterator, since followed iterator could be accessed earlier than prefix is drain out.

Even in .span implementation there is a Queue to memoize elements in the Leading definition.

So easiest implementation that I could imagine is the following via Stream.

implicit class StreamChopOps[T](xs: Stream[T]) {
  def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
    case x #:: _ =>
      def eq(e: T) = f(e) == f(x)
      xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
    case _ => Stream.empty
  }
}

Although it could be not the most performant as it memoize a lot. But with proper iterating of that, GC should handle problem of excess intermediate streams.

You could use it as myIterator.toStream.chopBy(f)

Simple check validates that following code can run without SO

Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
  .toStream.chopBy(identity)                     //(1,1),(2),(1,1),(2),...
  .map(xs => xs.sum * xs.size).sum               //60000000

Upvotes: 5

Related Questions