Reputation: 5006
I am trying to wrap my head around Flow
to build a parallel processing pipe with the following characteristics:
{offset, %{"id" => id}}
offset
is an monotonically incrementing integerid
is an arbitrary numberid
, so different ids can be computed out of order in parallel. Hence the partition.Here is an example stream to generate an infinite amount of these tuples:
stream = Stream.unfold(1, fn i ->
offset = i+1
element = {offset, %{"id" => Enum.random(1..10_000)}}
{element, offset}
end)
I want to partition the stream by they key id
. I know how to do that and for example start 8 parallel stages:
Flow.from_enumerable(stream)
|> Flow.partition(
key: fn {_, m} -> Map.get(m, "id") end,
stages: 8
)
Every operation I follow in this flow is now happening in parallel, order is only kept by the partition key id
.
offset
?To be clear, this is an infinite stream, so we need to keep in mind that we need to join over window (I have tried multiple ways. After all a timeout of 10 seconds is fine to start dropping events that do not arrive timely from processing).
Here is an illustration how I image it should work:
INCOMING
|
V
* PARTIONING in N-stages by `id`
|\
|-\
|--\
|||| PARALLEL PROCESSING in order by `id`
|--/
|-/
|/
| JOIN in order by `offset`
| timing out after 10 seconds, moving on with the smallest known offset
|
| SEQUENTIAL PROCESSING of each offset of the JOIN
Upvotes: 3
Views: 914
Reputation: 51439
The joining part can be done by calling partition/2
once again but setting the number of stages to 1.
Here is a sample script that reproduces your use case by emitting tuples with offsets and random partitions:
1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1})
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.emit(:state)
|> Flow.partition(stages: 1)
|> Flow.reduce(fn -> [] end, &Kernel.++/2)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect
The tricky part is in partitioning. Once you partition, you have to accumulate state.
So after the first partition, we call Flow.reduce/3
, processing the elements and then putting them on the top of a list. Processing is done by calling process_x
which you will have to implement. Once you have processed all entries, we ask to emit the whole partition state, i.e. a list of events, to the next step.
Then we partition again, but this time into a single stage, that simply concatenates the results of the previous partitions and then sort them at the end.
Another point not considered in my example above is that your flow is infinite, so you need to add some windows. You need to choose how frequently to emit items from each partition. For the first partition, you can emit batches of 1000 elements. For the joined partition, you mentioned you want it to happen every 10 seconds. So let's add those in.
Finally, note the code above is not the most efficient because everything that runs in the last partition is serial (a single stage). Ideally you want to sort in the first partition and simply merge the sorted results in the last partition, with the help of a merge_sorted
function that you would define.
Here is the final result:
partition_window = Flow.Window.global |> Flow.Window.trigger_every(1000, :reset)
join_window = Flow.Window.global |> Flow.Window.trigger_periodically(10, :second, :reset)
1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1}, window: partition_window)
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Flow.partition(stages: 1, window: join_window)
|> Flow.reduce(fn -> [] end, &merge_sorted/2)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect
Upvotes: 4