user3139545
user3139545

Reputation: 7394

Test groupBy with akka-stream-testkit

Im not able to get the following code working, it compiles but I get the error Expected OnNext(_), yet no element signaled during 3 seconds.

Why am i getting this error and what do I need to do to test the type of flow below?

class GeneralTests extends FunSuite {

  implicit val system = ActorSystem("Test-System")
  implicit val materializer = ActorMaterializer()

  case class Wid(id: Int, v: String)

  test("Substream with folds") {
    val (pub, sub) = TestSource.probe[Wid]
    .groupBy(Int.MaxValue, _.id)
      .fold("")((a: String, b: Wid) => a + b.v)
      .grouped(2)
    .mergeSubstreams
    .toMat(TestSink.probe[Seq[String]])(Keep.both)
    .run()

    sub.request(5)
    pub.sendNext(Wid(1,"1"))
    pub.sendNext(Wid(2,"2"))
    sub.expectNext()
    pub.sendNext(Wid(3,"3"))
    pub.sendNext(Wid(4,"4"))
    pub.sendNext(Wid(5,"5"))
    sub.expectNext()
    sub.expectNext()
  }
}

Update: Explaining the real stream I try to construct: The use case I have is a continuous (never ending) stream of elements which I need to split up into sub streams which I then can process in parallel given each incoming message. This is what I have right now but the groupBy is tricky. For example is there a way to clean up sub-streams? Since this is a never ending stream there might accumulate lots of sub-streams which need to be cleaned up.

class GeneralTests extends UnitSpec {

  implicit val system = ActorSystem("Test-System")
  implicit val materializer = ActorMaterializer()

  case class Wid(id: Int, v: String)

  val flow = Flow[Wid]
    .map { s ⇒ println(Thread.currentThread().getName() + " ASYNC " + s); s }
    .scan("")((a: String, b: Wid) => a + b.v)
    .sliding(2, step = 1)

  test("Parallel group-by with state") {
    val (pub, sub) = TestSource.probe[Wid]
    .map { s ⇒ println(Thread.currentThread().getName() + " BEFORE " + s); s }
    .groupBy(Int.MaxValue, _.id)
      .via(flow).async
    .mergeSubstreams
    .map { s ⇒ println(Thread.currentThread().getName() + " AFTER " + s); s }
    .toMat(TestSink.probe[Seq[String]])(Keep.both)
    .run()

sub.request(n = 4)
pub.sendNext(Wid(1,"1"))
println(sub.requestNext())
pub.sendNext(Wid(2,"2"))
println(sub.requestNext())
pub.sendNext(Wid(1,"3"))
println(sub.requestNext())
pub.sendNext(Wid(2,"4"))
println(sub.requestNext())
  }
}

However at some place the async operation runs on the same thread (6) as shown in the output.

Test-System-akka.actor.default-dispatcher-3 BEFORE Wid(1,1)
Test-System-akka.actor.default-dispatcher-4 ASYNC Wid(1,1)
Test-System-akka.actor.default-dispatcher-3 AFTER Vector(, 1)
Vector(, 1)
Test-System-akka.actor.default-dispatcher-4 BEFORE Wid(2,2)
Test-System-akka.actor.default-dispatcher-2 ASYNC Wid(2,2)
Test-System-akka.actor.default-dispatcher-2 AFTER Vector(, 2)
Vector(, 2)
Test-System-akka.actor.default-dispatcher-6 BEFORE Wid(1,3)
Test-System-akka.actor.default-dispatcher-6 ASYNC Wid(1,3)
Test-System-akka.actor.default-dispatcher-6 AFTER Vector(1, 13)
Vector(1, 13)
Test-System-akka.actor.default-dispatcher-6 BEFORE Wid(2,4)
Test-System-akka.actor.default-dispatcher-2 ASYNC Wid(2,4)
Test-System-akka.actor.default-dispatcher-6 AFTER Vector(2, 24)
Vector(2, 24)

Upvotes: 0

Views: 294

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

For the fold stage to emit its final value, the publisher (i.e. the upstream) needs to complete.

A valid sequence will look like:

sub.request(5)
pub.sendNext(Wid(1,"1"))
pub.sendNext(Wid(2,"2"))
pub.sendNext(Wid(3,"3"))
pub.sendNext(Wid(4,"4"))
pub.sendNext(Wid(5,"5"))
pub.sendComplete()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectComplete()

Upvotes: 1

Related Questions