martyn
martyn

Reputation: 146

Akka Streams stop stream after process n elements

I have Akka Stream flow which is reading from file using alpakka, processing data and write into a file. I want to stop flow after processed n elements, count the time of duration and call system terminate. How can I achieve it?

My flow looks like that:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._

 sourceFile ~> mainFlow ~> sinkFile

ClosedShape
})

graph.run()

Do you have an idea? Thanks

Upvotes: 3

Views: 1467

Answers (2)

Viktor Klang
Viktor Klang

Reputation: 26597

No need for GraphDSL here.

val doneFuture = (sourceFile via mainFlow.take(N) runWith sinkFile) transformWith { _ => system.terminate() }

To obtain time, you can use akka-streams-contrib: https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala

Upvotes: 2

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

Agreeing with what @Viktor said, first of all you don't need to use the graphDSL to achieve this, and you can use take(n) to complete the graph.

Secondly, you can use mapMaterializedValue to attach a callback to your Sink materialized value (which in turn should materializes to a Future[Something]).

  val graph: RunnableGraph[Future[FiniteDuration]] =
    sourceFile
      .via(mainFlow)
      .take(N)
      .toMat(sinkFile)(Keep.right)
      .mapMaterializedValue { f ⇒
        val start = System.nanoTime()
        f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
      }

  graph.run().onComplete { duration ⇒
    println(s"Elapsed time: $duration")
  }

Note that you are going to need an ExecutionContext in scope.

EDIT

Even if you have to use the graphDSL, the same concepts apply. You only need to expose the materialized Future of your sink and map on that.

  val graph: RunnableGraph[Future[??Something??]] = 
    RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
    import GraphDSL.Implicits._

    sourceFile ~> mainFlow ~> snk

    ClosedShape
  })

  val timedGraph: RunnableGraph[Future[FiniteDuration]] = 
    graph.mapMaterializedValue { f ⇒
      val start = System.nanoTime()
      f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
    }

  timedGraph.run().onComplete { duration ⇒
    println(s"Elapsed time: $duration")
  }

Upvotes: 2

Related Questions