Lee
Lee

Reputation: 3104

Repo.stream as Flow's infinite data source

I have been playing with Elixir Flow for some time now and recently I was trying to parallelize my workflow using Flow and Repo.stream using:

endless_db_stream = MyRepo.stream(some_query)
MyRepo.transaction(fn ->
  endless_db_stream
  |> Flow.from_enumerable()
  |> Flow.each(&process(&1))
  |> Flow.run
end)

but it just doesn't work. Now I have did some research and stumble into this comment from Jose Valim saying basically Repo.stream aren't really compatible with GenStage and I believe its also not compatible with Flow (since its built atop of GenStage).

My question is, has anyone use PSQL as the unbounded data source for Flow before?

P/S: In the same GitHub thread above there is a "hack" that uses a GenStage to wrap the Repo.stream then act as a producer, but I was looking for a more streamlined approach as I was planning to use Flow instead of GenStage

Upvotes: 1

Views: 744

Answers (1)

Kenton Wang
Kenton Wang

Reputation: 21

There are two ways to implement it in our projects.

  1. the simpler way: use resources ids as the data source.

    You can list all resources ids first, then get every resource separately in flow.

  2. the normal way: use Stream.resource/3 to customize the data source.

    You can also make a stream by Stream.resource/3, use a paginated query to get some resources at once.

see more detail here, How to build Streams in Elixir easily with Stream.resource/3 Awesomeness

Upvotes: 2

Related Questions