MaatDeamon
MaatDeamon

Reputation: 9769

Akka-Stream stream within stream

I am trying to figure out how to handle a situation where in one of your stage you need to make a call that return an InputStream, where I would deal with that stream as a Source of the stage that comes further down.

e.g.

 Source.map(e => Calls that return an InputStream)
 .via(processingFlow).runwith(sink.ignore)

I would like that the element going to Processing flow as those coming from the InputStream. This is basically a situation where I am tailing a file, reading each line, the line give me the information about a call I need to make against a CLI api, when making that call I get the Stdout as an InputStream from which to read the result. Result are going to be huge most of the time, so I can just collect the all thing in memory.

Upvotes: 0

Views: 301

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

  • you can use StreamConverters utilities to get Sources and Sinks from java.io streams. More info here.
  • you can use flatMapConcat or flatMapMerge to flatten a stream of Sources into a single stream. More info here.

A quick example could be:

  val source: Source[String, NotUsed] = ???
  def gimmeInputStream(name: String): InputStream = ???
  val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???

  source
    .map(gimmeInputStream)
    .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
    .via(processingFlow)
    .runWith(Sink.ignore)

However Akka Streams offers a more idiomatic DSL to read/write files in the FileIO object. More info here.

The example becomes:

  val source: Source[String, NotUsed] = ???
  val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???

  source
    .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
    .via(processingFlow)
    .runWith(Sink.ignore)

Upvotes: 1

Related Questions