Reputation: 709
I'm totally new in Scala and Akka. I've a simple RunnableFlow:
Source -> Flow (do some transformation) -> Sink.runForeach
Now I want something like this:
Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
But Flow2 should wait until 100 elements from Flow1 are available and then transform these 100 elements to a new element (which needs all 100 elements from Flow1) and give this new element to the Sink.
I did some research and found Explicit user defined buffers but I don´t understand how I can access all 100 elements from flow1 in flow2 and do some transformation with them. Can someone explain it? Or even better post a small simple example ? Or both?
Upvotes: 3
Views: 4236
Reputation: 17923
Akka Defined Collection
If you don't mind using an akka determined collection type then you can use the grouped
function instead:
//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)
User Defined Collection
If you want to control the type of collection used for your buffer, e.g. a Seq
or Array
:
type MyCollectionType[X] = Array[X]
def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
Then you can perform this operation with two Flows. The first Flow executes a scan
to build up a sequence of elements:
val bufferSize = 10
def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}
The second Flow is a filter
for a sequence with just the right size (i.e. "goldiLocks"):
val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
These two Flows can be combined to produce a Stream which will generate the desired collection type:
val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)
Upvotes: 9