ChucK
ChucK

Reputation: 2134

Scala: Functional replacement for iterators over variable sized batches

I'm using Scala to read columns out of our column store Cassandra. Each column contains a number of entries, n, where n can be between 10 and 20. We read a batch of entries, ie 1000 at a time, and have to create columns from the entries; each entry has an ID attached that we can use to group-by.

Currently we use an iterator to go through the entries in a batch and find out if we're onto a new column by comparing the current and previous ID, and we read many batches till we're done. We need to store a partial column at the end of each batch iteration because the rest of the column will be in the next batch. I've put some pseudo code below to demonstrate the basic algorithm we currently employ.

How could do this in a functional way? (If n was constant this would be a simple problem as we could set the batch size appropriately.)

Pseudo code:

val resultBuffer // collects all columns
val columnBuffer // collects entries for current column
var currentId    // id of current column

while(batchIterator.hasNext){
     val batch = batchIterator.getNext
     val entryIterator = batch.entries.iterator

     while(entryIterator.hasNext){
           val entry = entryIterator.next
            if(entry.id != currentId) {
               currentId = entry.id  
               resultBuffer += columnBuilder(columnBuffer)
               columnBuffer.removeAll
               columnBuffer += entry
            } else {
                columnBuffer += entry
            } 
     }
}

Upvotes: 0

Views: 299

Answers (1)

Malte Schwerhoff
Malte Schwerhoff

Reputation: 12852

Here is a sketch of a more functional implementation that uses sliding to group the entries in the entry-iterator:

val resultBuffer // collects all columns

batchIterator.foreach(batch => {
  val buffer = 
    batch.entries.sliding(2).foldLeft(new ColumnBuffer){(buffer, (curr, next)) =>
      if (curr.id != next.id) {
        resultBuffer += columnBuilder(buffer :+ entry /* Append entry to buffer */)
        new ColumnBuffer
      } else
        buffer += entry /* Return buffer with entry added */
    }

  if (buffer.nonEmpty) resultBuffer += columnBuilder(buffer)
}

Here, the only object that is "global" and thus has to be mutable is resultBuffer. We could even get rid of that by including it as another accumulator in the inner foldLeft and by replacing the outer foreach with another foldLeft.

If runtime efficiency is critical for your code you should definitely benchmark the various possible implementations in order to find a good trade-off between functional-ness and performance.


EDIT 1: Fixed an error in the sketch, namely, that the last sequence of entries stored in buffer were not added to the resultBuffer. The error already exists in the OP's code.


EDIT 2: (Addressing ChucK's comment)

curr will take the values entries(0) to entries(entries.size() - 2), that is, the last element will not be processed. One way of addressing this is to append a dummy-element to the iterator, e.g.

(batch.entries ++ List(dummy)).sliding(2).foldLeft ...

This isn't nice and, more importantly, it won't work when batch.entries is empty, because sliding(2) then produces the single window List(dummy). Another solution would be to include next in the accumulator of the inner foldLeft and to process it after the foldLeft has terminated. I haven't worked it out but it looks as if that would make the solution even less appealing.

Upvotes: 1

Related Questions