Reputation: 3104
I have been playing with Elixir Flow
for some time now and recently I was trying to parallelize my workflow using Flow
and Repo.stream
using:
endless_db_stream = MyRepo.stream(some_query)
MyRepo.transaction(fn ->
endless_db_stream
|> Flow.from_enumerable()
|> Flow.each(&process(&1))
|> Flow.run
end)
but it just doesn't work. Now I have did some research and stumble into this comment from Jose Valim saying basically Repo.stream
aren't really compatible with GenStage
and I believe its also not compatible with Flow
(since its built atop of GenStage
).
My question is, has anyone use PSQL as the unbounded data source for Flow
before?
P/S: In the same GitHub thread above there is a "hack" that uses a GenStage
to wrap the Repo.stream
then act as a producer, but I was looking for a more streamlined approach as I was planning to use Flow
instead of GenStage
Upvotes: 1
Views: 744
Reputation: 21
There are two ways to implement it in our projects.
the simpler way: use resources ids as the data source.
You can list all resources ids first, then get every resource separately in flow.
the normal way: use Stream.resource/3
to customize the data source.
You can also make a stream by Stream.resource/3
, use a paginated query to get some resources at once.
see more detail here, How to build Streams in Elixir easily with Stream.resource/3 Awesomeness
Upvotes: 2