teo
teo

Reputation: 1403

Flattening Streams produced from multiple JDBC ResultSets, to prevent loading everything in memory

I am given List[String], that I need to group in chunks. For each chunk, I need to run a query (JDBC) that returns a List[String] as a result.

What I'm trying to get to is:

This is what I've done:

Producing a Stream from a ResultSet, given a List[String] (this is the chunk):

def resultOfChunk(chunk: List[String])(statement: Statement): Stream[String] = {
 //..
  val resultSet = statement.executeQuery(query)
  new Iterator[String] {
    def hasNext = resultSet.next()
    def next() = resultSet.getString(1)
  }.toStream
}

Producing the final list:

val initialList: List[String] = //.. 

val connection = //..
val statement = connection.createStatement
val streams = for {
  chunk <- initialList.grouped(10)
  stream = resultOfChunk(chunk)(statement)
} yield stream

val finalList = streams.flatten

statement.close()
connection.close()

(Variable names are intended to prove the concept).

It appears to work, but I'm a bit nervous about:

  1. producing an Iterator[Stream] with a for-comprehension. Is this something people do?
  2. flattening said Iterator[Stream] (can I assume they do not get evaluated during the flattening?).
  3. is there any way I can use the final List after I close the connection and statement?
    Say, if I need to do operations that last a long time and do not want to keep the connection open during this, what are my options?
  4. does this code actually prevent loading the whole DB ResultSet into memory at once (which was my actual goal) ?

Upvotes: 0

Views: 492

Answers (1)

gzm0
gzm0

Reputation: 14842

I'll reply one by one:

  1. Sure, why not. You might want to consider flattening in the for-comprehension directly for readability.

    val finalList = for {
      chunk  <- initialList.grouped(10)
      result <- resultOfChunk(chunk)(statement)
    } yield result
    
  2. See above for flattening. Yes you can assume they will not get evaluated.

  3. The Iterator cannot be re-used (assuming initialList.grouped(10) gives you an iterator). But you can use a Stream instead of an Iterator and then, yes you can, but:
    • you will have to make sure it is fully evaluated before you close the connection
    • this will store all the data in memory
  4. Yes it does

Based on what I've seen, I'd recommend you the following:

val finalList = for {
  chunk  <- initialList.grouped(10).toStream
  result <- resultOfChunk(chunk)(statement)
} yield result

This will give you a Stream[String] that is evaluated as needed (when accessed in sequence). Once it is fully evaluated you may close the database connection and still use it.

Upvotes: 1

Related Questions