Synesso
Synesso

Reputation: 38978

Converting paging function to a Flow

I have a large quantity of sqlite databases, represented as Source[File, NotUsed]. For each db, I want to paginate through the results. Memory limits mean I cannot do this eagerly. Say that the result type is Foo, then I'm trying to figure out how to create a Flow[File, Foo, NotUsed] that internally uses a lazy, recursive call on the resource.

I see that the Source.unfold method allows me to do this, but it can only create a Source, which means I can't feed it the necessary input of File. I can't see how to convert a Source to a Flow (except via fromSinkAndSource, but that doesn't pipe the values through). I'm not sure if this path of inquiry will yield anything.

It was suggested to me that I should use the GraphDSL and Merge, but I'm stuck trying to understand how many input ports the Merge should have and how I would actually wire it together.

Upvotes: 1

Views: 82

Answers (1)

Tim Moore
Tim Moore

Reputation: 9482

I think you're looking for the flatMapConcat operator:

Signature

def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]

Description

Transform each input element into a Source whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.

emits when the current consumed substream has an element available

backpressures when downstream backpressures

completes when upstream completes and all consumed substreams complete

Upvotes: 1

Related Questions