Reputation: 11552
My data from an unbound stream source looks something like this:
value1,
value3,
...,
START,
value155,
...,
value202,
END,
...,
value234,
value235,
...
START,
value298,
...,
value310,
END,
...,
value377,
...
Based on Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink), I came up with the following code using Akka Streams to accumulate messages between a fixed "start key" and "end key" (here START and END):
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.head.equals("start"))
coll :+ s
else
Seq.empty[String] // return empty Seq unless new element == "start"
// or first element of Seq == "start"
}
.filter(_.last.equals("end"))
.to(Sink.foreach(println)).run()
Alas, nothing gets past the filter at all! No output.
Replacing coll.head.equals
and coll.last.equals
with .contains
, returns a result, of course it is not correct, since "end" is at some point always included.
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.contains("start"))
coll :+ s
else
Seq.empty[String]
}
.filter(_.contains("end"))
.to(Sink.foreach(println)).run()
As expected, the output is:
List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)
Any suggestions on how to solve this? I suspect some "materialization" needs to be forced along the way, or I might just run into some lazy eval/actor/async issue that I am not aware of. Thanks in advance!
(at the time of writing, https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html has a ready-made ScaleFiddle for quickly playing around with Akka Streams)
Edit:
Clarify "unbound" - what I meant was, that the list of messages is not only unbound, but also the "START" and "END" cycles repeat as well. I have updated the example accordingly.
Upvotes: 0
Views: 490
Reputation: 26911
Alternative might be to use groupedWeightedWithin(maxWeight: Long, maxDuration: FiniteDuration)
, which groups elements by time and weight function. The trick is to assign zero weight to all elements except "end" and make "end" element be heavy enough to be equal or greater the maxWeight
weight:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"
val maxDuration = 120.seconds // put arbitrarily high duration ehre
val resultFuture = Source(list)
// accumulates everything up until and including "end" element
// essentially splits at "end" elements
.groupedWeightedWithin(1L, maxDuration)({
case "end" => 1L
case _ => 0
})
.map(accumulated =>
accumulated
.dropWhile(_ != "start") // drop everything till "start" element
.drop(1) // drop "start"
.takeWhile(_ != "end") // take everything until "end" is seen
)
// Run and accumulate into seq - result will be Seq[Seq[String]]
.runWith(Sink.seq)
println()
Await.result(resultFuture, 1.second) # Vector(Vector(d4, d5, d6, d7), Vector(d11, d12, d13))
This allows capturing multiple "start"-...-"end" sequences without re-materializing the stream (and will just work fine with exactly one)
Upvotes: 0
Reputation: 22439
Here's one way that first transforms the Source elements into sliding 2-element lists, drops the pre-"start
" lists followed by taking the pre-"end
" lists, and then conditionally captures the list elements using mapConcat
:
Source(list)
.sliding(2, 1)
.dropWhile(_(0) != "start")
.takeWhile(_(0) != "end")
.mapConcat( ls => if (ls(1) != "end") List(ls(0)) else ls )
.runForeach(println)
// start
// d4
// d5
// d6
// d7
// end
To capture the elements in a collection, just replace runForeach(println)
with runWith(Sink.seq[String])
.
Upvotes: 0
Reputation: 19497
One approach is to use statefulMapConcat
:
val source =
Source(List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10"))
source.statefulMapConcat { () =>
var started = false
var ended = false
x =>
if (x == "start") {
started = true
Nil
} else if (x == "end") {
ended = true
Nil
} else if (started && !ended) {
x :: Nil
} else {
Nil
}
}.runForeach(println)
The above code prints the following:
d4
d5
d6
d7
If you want to accumulate the elements between "start" and "end" instead of individually printing those elements on a streaming basis, you can adjust the above snippet to do so. Alternatively, take a look at AccumulateWhileUnchanged
from the Akka Streams Contrib project.
Upvotes: 2