juwalter
juwalter

Reputation: 11552

Scala collection not materialized inside simple Akka Streams operation

My data from an unbound stream source looks something like this:

value1, 
value3, 
..., 
START, 
value155, 
..., 
value202, 
END, 
...,
value234,
value235, 
...
START, 
value298, 
..., 
value310, 
END, 
...,
value377, 
...

Based on Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink), I came up with the following code using Akka Streams to accumulate messages between a fixed "start key" and "end key" (here START and END):

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats 

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

Source(list)
  .scan(Seq.empty[String]) { (coll, s) => 
    if(s.equals("start") || coll.head.equals("start")) 
      coll :+ s
    else
      Seq.empty[String] // return empty Seq unless new element == "start" 
                        // or first element of Seq == "start" 
  }    
  .filter(_.last.equals("end"))
  .to(Sink.foreach(println)).run()

Alas, nothing gets past the filter at all! No output.

Replacing coll.head.equals and coll.last.equals with .contains, returns a result, of course it is not correct, since "end" is at some point always included.

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats 

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

Source(list)
  .scan(Seq.empty[String]) { (coll, s) => 
    if(s.equals("start") || coll.contains("start")) 
      coll :+ s
    else
      Seq.empty[String]
  }    
  .filter(_.contains("end"))
  .to(Sink.foreach(println)).run()

As expected, the output is:

List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)

Any suggestions on how to solve this? I suspect some "materialization" needs to be forced along the way, or I might just run into some lazy eval/actor/async issue that I am not aware of. Thanks in advance!

(at the time of writing, https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html has a ready-made ScaleFiddle for quickly playing around with Akka Streams)

Edit:

Clarify "unbound" - what I meant was, that the list of messages is not only unbound, but also the "START" and "END" cycles repeat as well. I have updated the example accordingly.

Upvotes: 0

Views: 490

Answers (3)

J0HN
J0HN

Reputation: 26911

Alternative might be to use groupedWeightedWithin(maxWeight: Long, maxDuration: FiniteDuration), which groups elements by time and weight function. The trick is to assign zero weight to all elements except "end" and make "end" element be heavy enough to be equal or greater the maxWeight weight:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"

val maxDuration = 120.seconds // put arbitrarily high duration ehre

val resultFuture = Source(list)
  // accumulates everything up until and including "end" element
  // essentially splits at "end" elements
  .groupedWeightedWithin(1L, maxDuration)({
    case "end" => 1L
    case _ => 0
  })
  .map(accumulated => 
    accumulated
     .dropWhile(_ != "start") // drop everything till "start" element
     .drop(1)                 // drop "start"
     .takeWhile(_ != "end")   // take everything until "end" is seen
  ) 
  // Run and accumulate into seq - result will be Seq[Seq[String]]
  .runWith(Sink.seq)

println()
Await.result(resultFuture, 1.second) # Vector(Vector(d4, d5, d6, d7), Vector(d11, d12, d13))

This allows capturing multiple "start"-...-"end" sequences without re-materializing the stream (and will just work fine with exactly one)

Upvotes: 0

Leo C
Leo C

Reputation: 22439

Here's one way that first transforms the Source elements into sliding 2-element lists, drops the pre-"start" lists followed by taking the pre-"end" lists, and then conditionally captures the list elements using mapConcat:

Source(list)
  .sliding(2, 1)
  .dropWhile(_(0) != "start")
  .takeWhile(_(0) != "end")
  .mapConcat( ls => if (ls(1) != "end") List(ls(0)) else ls )
  .runForeach(println)

// start
// d4
// d5
// d6
// d7
// end

To capture the elements in a collection, just replace runForeach(println) with runWith(Sink.seq[String]).

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

One approach is to use statefulMapConcat:

val source =
  Source(List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10"))

source.statefulMapConcat { () =>
  var started = false
  var ended = false

  x =>
    if (x == "start") {
      started = true
      Nil
    } else if (x == "end") {
      ended = true
      Nil
    } else if (started && !ended) {
      x :: Nil
    } else {
      Nil
    }
}.runForeach(println)

The above code prints the following:

d4
d5
d6
d7

If you want to accumulate the elements between "start" and "end" instead of individually printing those elements on a streaming basis, you can adjust the above snippet to do so. Alternatively, take a look at AccumulateWhileUnchanged from the Akka Streams Contrib project.

Upvotes: 2

Related Questions