Reputation: 899
I wrote a simple test to explore the PartitionWith
graph functionality from akka.stream.contrib
. Here is the code snippet:
class Scratch
extends TestKit(ActorSystem("PartitionWith"))
with WordSpecLike
with ScalaFutures
with Eventually
with Matchers {
"PartitionWith" should {
"split source" in {
val source: Source[Either[Int, String], NotUsed] = Source(List(Left(1), Right("one")))
val leftHeadSink = Sink.head[Int]
val rightHeadSink = Sink.head[String]
val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftHeadSink, rightHeadSink)((_, _, _)) {
implicit builder: GraphDSL.Builder[(NotUsed, Future[Int], Future[String])] => (s, l, r) =>
import GraphDSL.Implicits._
val pw = builder.add(PartitionWith.apply[Either[Int, String], Int, String](identity))
s ~> pw.in
pw.out0 ~> l.in
pw.out1 ~> r.in
ClosedShape
})
val event = flow.run()
event._2.futureValue shouldBe 1 // first check
event._3.futureValue shouldBe "one" // second check
}
When I run the above test, it throws me this error:
The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
org.scalatest.exceptions.TestFailedException: The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
It seems like it fails in the second check because the rightHeadSink
is empty. I'm wondering if the Right("one")
in Source(List(Left(1), Right("one")))
is never processed??
How do I fix this?
Upvotes: 0
Views: 380
Reputation: 9023
As the first Sink.head
completes, it seems like the whole stream is completed, so the second sink fails to retrieve its element.
Try to test with 2 Sink.seq
s instead of 2 Sink.head
s to avoid a precocious termination of your stream.
Upvotes: 0