alex
alex

Reputation: 1917

Lazy Pagination in Scala (Stream/Iterator of Iterators?)

I'm reading a very large number of records sequentially from database API one page at a time (with unknown number of records per page) via call to def readPage(pageNumber: Int): Iterator[Record]

I'm trying to wrap this API in something like either Stream[Iterator[Record]] or Iterator[Iterator[Record]] lazily, in a functional way, ideally no mutable state, with constant memory footprint, so that I can treat it as infinite stream of pages or sequence of Iterators, and abstract away the pagination from the client. Client can iterate on the result, by calling next() it will retrieve the next page (Iterator[Record]).

What is the most idiomatic and efficient way to implement this in Scala.

Edit: need to fetch & process the records one page at a time, cannot maintain all the records from all pages in memory. If one page fails, throw an exception. Large number of pages/records means infinite for all practical purposes. I want to treat it as infinite stream (or iterator) of pages with each page being an iterator for finite number of records (e.g. less <1000 but exact number is unknown ahead if time).

I looked at BatchCursor in Monix but it serves a different purpose.

Edit 2: this is the current version using Tomer's answer below as starting point, but using Stream instead of Iterator. This allows to eliminate the need in tail recursion as per https://stackoverflow.com/a/10525539/165130, and have O(1) time for stream prepend #:: operation (while if we've concatenated iterators via ++ operation it would be O(n))

Note: While streams are lazily evaluated, Stream memoization may still cause memory blow up, and memory management gets tricky. Changing from val to def to define the Stream in def pages = readAllPages below doesn't seem to have any effect

def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
   val iter: Iterator[Record] = readPage(pageNumber)
   if (iter.isEmpty)
     Stream.empty
   else
    iter #:: readAllPages(pageNumber + 1)
} 
      
//usage
val pages = readAllPages
for{
    page<-pages
    record<-page
    if(isValid(record))
}
process(record)
 

Edit 3: the second suggestion by Tomer seems to be the best, its runtime and memory footprint is similar to the above solution but it is much more concise and error-prone.

val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)

Note: Stream.from(1) creates a stream starting from 1 and incrementing by 1, it's in API docs

Upvotes: 1

Views: 930

Answers (1)

Tomer Shetah
Tomer Shetah

Reputation: 8529

You can try implement such logic:

def readPage(pageNumber: Int): Iterator[Record] = ???

@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
  val iter = readPage(pageNumber)
  if (iter.nonEmpty) {
    // Compute on records
    // When finishing computing:
    Iterator(iter) ++ readAllPages(pageNumber + 1)
  } else {
    Iterator.empty
  }
}

readAllPages(0)

A shorter option will be:

Stream.from(1).map(readPage).takeWhile(_.nonEmpty)

Upvotes: 2

Related Questions