Reputation: 2134
I'm using Scala to read columns out of our column store Cassandra. Each column contains a number of entries, n, where n can be between 10 and 20. We read a batch of entries, ie 1000 at a time, and have to create columns from the entries; each entry has an ID attached that we can use to group-by.
Currently we use an iterator to go through the entries in a batch and find out if we're onto a new column by comparing the current and previous ID, and we read many batches till we're done. We need to store a partial column at the end of each batch iteration because the rest of the column will be in the next batch. I've put some pseudo code below to demonstrate the basic algorithm we currently employ.
How could do this in a functional way? (If n was constant this would be a simple problem as we could set the batch size appropriately.)
Pseudo code:
val resultBuffer // collects all columns
val columnBuffer // collects entries for current column
var currentId // id of current column
while(batchIterator.hasNext){
val batch = batchIterator.getNext
val entryIterator = batch.entries.iterator
while(entryIterator.hasNext){
val entry = entryIterator.next
if(entry.id != currentId) {
currentId = entry.id
resultBuffer += columnBuilder(columnBuffer)
columnBuffer.removeAll
columnBuffer += entry
} else {
columnBuffer += entry
}
}
}
Upvotes: 0
Views: 299
Reputation: 12852
Here is a sketch of a more functional implementation that uses sliding to group the entries in the entry-iterator:
val resultBuffer // collects all columns
batchIterator.foreach(batch => {
val buffer =
batch.entries.sliding(2).foldLeft(new ColumnBuffer){(buffer, (curr, next)) =>
if (curr.id != next.id) {
resultBuffer += columnBuilder(buffer :+ entry /* Append entry to buffer */)
new ColumnBuffer
} else
buffer += entry /* Return buffer with entry added */
}
if (buffer.nonEmpty) resultBuffer += columnBuilder(buffer)
}
Here, the only object that is "global" and thus has to be mutable is resultBuffer
. We could even get rid of that by including it as another accumulator in the inner foldLeft
and by replacing the outer foreach
with another foldLeft
.
If runtime efficiency is critical for your code you should definitely benchmark the various possible implementations in order to find a good trade-off between functional-ness and performance.
EDIT 1: Fixed an error in the sketch, namely, that the last sequence of entries stored in buffer
were not added to the resultBuffer
. The error already exists in the OP's code.
EDIT 2: (Addressing ChucK's comment)
curr
will take the values entries(0)
to entries(entries.size() - 2)
, that is, the last element will not be processed. One way of addressing this is to append a dummy-element to the iterator, e.g.
(batch.entries ++ List(dummy)).sliding(2).foldLeft ...
This isn't nice and, more importantly, it won't work when batch.entries
is empty, because sliding(2)
then produces the single window List(dummy)
. Another solution would be to include next
in the accumulator of the inner foldLeft
and to process it after the foldLeft
has terminated. I haven't worked it out but it looks as if that would make the solution even less appealing.
Upvotes: 1