Reputation: 14655
Suppose I have Iterator[A]
(size is infinite) and I want to get Iterator[B]
from it where some subsequent values of type A are aggregated.
Example: I have list of strings:
Iterator(
"START",
"DATA1",
"DATA2",
"DATA3",
"START",
"DATA1",
"DATA2",
//.. 10^10 more records
)
I want to join strings from START to NEXT START excluding. I.e. write parser.
Iterator(
"START DATA1 DATA2 DATA3",
"START DATA1 DATA2",
//.. 10^10 / 5 more records
)
I know how to do this imperatively, but I want to accomplish it with scala higher order functions. Any ideas?
PS EIP Aggregate http://camel.apache.org/aggregator2.html.
Upvotes: 3
Views: 603
Reputation: 2717
Well, an infinite stream changes things rather dramatically. Assuming I understand the rest of your situation, this should work:
def aggregate(it: Iterator[String]) = new Iterator[String] {
if (it.hasNext) it.next
def hasNext = it.hasNext
def next = "START " + (it.takeWhile(_ != "START")).mkString(" ")
}
So that you can:
val i = aggregate(yourStream.iterator)
i.take(20).foreach(println) // or whatever
Upvotes: 5
Reputation: 9374
With Streams:
object Iter {
def main(args: Array[String]) {
val es = List("START", "DATA1", "DATA2", "START", "DATA1", "START")
val bit = batched(es.iterator, "START")
println(bit.head.toList)
println(bit.tail.head.toList)
}
def batched[T](it: Iterator[T], start: T) = {
def nextBatch(): Stream[List[T]] = {
(it takeWhile { _ != start }).toList match {
case Nil => nextBatch()
case es => Stream.cons(start :: es, nextBatch())
}
}
nextBatch()
}
}
Upvotes: 0
Reputation: 40461
If you want a functional solution, you should use Streams rather than iterators (streams are immutable). Here's one possible approach:
def aggregate(strs: Stream[String] ) = {
aggregateRec( strs )
}
def aggregateRec( strs: Stream[String] ): Stream[String] = {
val tail = strs.drop(1)
if( tail.nonEmpty ) {
val (str, rest ) = accumulate( tail )
Stream.cons( str, aggregateRec( rest ) )
}
else Stream.empty
}
def accumulate( strs: Stream[String] ): (String, Stream[String]) = {
val first = "START " + strs.takeWhile( _ != "START").mkString(" ")
val rest = strs.dropWhile( _ != "START" )
( first, rest )
}
It works as expected:
val strs = Stream( "START", "1", "2", "3", "START", "A", "B" )
val strs2 = aggregate( strs )
strs2 foreach println
Upvotes: 6
Reputation: 60006
You could try it with a fold:
val ls = List(
"START",
"DATA1",
"DATA2",
"DATA3",
"START",
"DATA1",
"DATA2"
)
(List[List[String]]() /: ls) { (acc, elem) =>
if (elem == "START")
List(elem) :: acc // new head list
else
(elem :: acc.head) :: acc.tail // prepend to current head list
} map (_.reverse mkString " ") reverse;
Upvotes: 1