Alexander Kahoun
Alexander Kahoun

Reputation: 2488

How to test an akka stream closed shape runnable graph with an encapsulated source and sink

I created an akka stream that has a process function and an error handler function passed into it. The Source and Sink are fully encapsulated in a ClosedShape RunnableFlow. My intention is that I would pass an item to the parent class and run it through the flow. This all seems to work until I get to testing. I'm using scala-test and passing appending to Lists inside the process function and error handler function. I'm randomly generating errors to see things flow to the error handler function as well. The problem is if I pass 100 items to the parent class then I would expect the list of items in the error function and the list of items in the process function to add up to 100. Since the Source and Sink are fully encapsulated I don't have a clear way to tell the test to wait and it gets to the assertion/should statements before all items have processed through the stream. I've created this gist to describe the stream.

Here's an example test for the above gist:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}

Upvotes: 5

Views: 2745

Answers (1)

Alexander Kahoun
Alexander Kahoun

Reputation: 2488

Per the comment from Viktor Klang on the linked Gist. This proves to be a great solution:

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

This allows me to Await.result on the RunnableGraph for testing purposes. Thanks again to Viktor for this solution!

Upvotes: 2

Related Questions