Reputation: 9769
I am trying to figure out how to handle a situation where in one of your stage you need to make a call that return an InputStream, where I would deal with that stream as a Source of the stage that comes further down.
e.g.
Source.map(e => Calls that return an InputStream)
.via(processingFlow).runwith(sink.ignore)
I would like that the element going to Processing flow as those coming from the InputStream. This is basically a situation where I am tailing a file, reading each line, the line give me the information about a call I need to make against a CLI api, when making that call I get the Stdout as an InputStream from which to read the result. Result are going to be huge most of the time, so I can just collect the all thing in memory.
Upvotes: 0
Views: 301
Reputation: 9023
StreamConverters
utilities to get Source
s and Sink
s from java.io streams. More info here.flatMapConcat
or flatMapMerge
to flatten a stream of Source
s into a single stream. More info here.A quick example could be:
val source: Source[String, NotUsed] = ???
def gimmeInputStream(name: String): InputStream = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.map(gimmeInputStream)
.flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
.via(processingFlow)
.runWith(Sink.ignore)
However Akka Streams offers a more idiomatic DSL to read/write files in the FileIO
object. More info here.
The example becomes:
val source: Source[String, NotUsed] = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
.via(processingFlow)
.runWith(Sink.ignore)
Upvotes: 1