Reputation: 38978
I have a large quantity of sqlite databases, represented as Source[File, NotUsed]
. For each db, I want to paginate through the results. Memory limits mean I cannot do this eagerly. Say that the result type is Foo
, then I'm trying to figure out how to create a Flow[File, Foo, NotUsed]
that internally uses a lazy, recursive call on the resource.
I see that the Source.unfold
method allows me to do this, but it can only create a Source
, which means I can't feed it the necessary input of File
. I can't see how to convert a Source
to a Flow
(except via fromSinkAndSource, but that doesn't pipe the values through). I'm not sure if this path of inquiry will yield anything.
It was suggested to me that I should use the GraphDSL
and Merge
, but I'm stuck trying to understand how many input ports the Merge
should have and how I would actually wire it together.
Upvotes: 1
Views: 82
Reputation: 9482
I think you're looking for the flatMapConcat
operator:
Signature
def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]
Description
Transform each input element into a
Source
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete
Upvotes: 1