Overbryd
Overbryd

Reputation: 5006

How to partition by key and join back in order with Elixir Flow

I am trying to wrap my head around Flow to build a parallel processing pipe with the following characteristics:

  1. Incoming events have the form {offset, %{"id" => id}}
  2. offset is an monotonically incrementing integer
  3. id is an arbitrary number
  4. I only need to retain the order for every id, so different ids can be computed out of order in parallel. Hence the partition.

Here is an example stream to generate an infinite amount of these tuples:

stream = Stream.unfold(1, fn i ->
  offset = i+1
  element = {offset, %{"id" => Enum.random(1..10_000)}}
  {element, offset}
end)

I want to partition the stream by they key id. I know how to do that and for example start 8 parallel stages:

Flow.from_enumerable(stream)
|> Flow.partition(
  key: fn {_, m} -> Map.get(m, "id") end,
  stages: 8
)

Every operation I follow in this flow is now happening in parallel, order is only kept by the partition key id.

Now what I do not understand: How to join back the stream into a single stage, ordered by offset?

To be clear, this is an infinite stream, so we need to keep in mind that we need to join over window (I have tried multiple ways. After all a timeout of 10 seconds is fine to start dropping events that do not arrive timely from processing).

Here is an illustration how I image it should work:

INCOMING
|
V
* PARTIONING in N-stages by `id`
|\
|-\
|--\
|||| PARALLEL PROCESSING in order by `id`
|--/
|-/
|/
| JOIN in order by `offset`
| timing out after 10 seconds, moving on with the smallest known offset
|
| SEQUENTIAL PROCESSING of each offset of the JOIN

Upvotes: 3

Views: 914

Answers (1)

José Valim
José Valim

Reputation: 51439

The joining part can be done by calling partition/2 once again but setting the number of stages to 1.

Here is a sample script that reproduces your use case by emitting tuples with offsets and random partitions:

1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1})
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.emit(:state)
|> Flow.partition(stages: 1)
|> Flow.reduce(fn -> [] end, &Kernel.++/2)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect

The tricky part is in partitioning. Once you partition, you have to accumulate state.

So after the first partition, we call Flow.reduce/3, processing the elements and then putting them on the top of a list. Processing is done by calling process_x which you will have to implement. Once you have processed all entries, we ask to emit the whole partition state, i.e. a list of events, to the next step.

Then we partition again, but this time into a single stage, that simply concatenates the results of the previous partitions and then sort them at the end.

Another point not considered in my example above is that your flow is infinite, so you need to add some windows. You need to choose how frequently to emit items from each partition. For the first partition, you can emit batches of 1000 elements. For the joined partition, you mentioned you want it to happen every 10 seconds. So let's add those in.

Finally, note the code above is not the most efficient because everything that runs in the last partition is serial (a single stage). Ideally you want to sort in the first partition and simply merge the sorted results in the last partition, with the help of a merge_sorted function that you would define.

Here is the final result:

partition_window = Flow.Window.global |> Flow.Window.trigger_every(1000, :reset)
join_window = Flow.Window.global |> Flow.Window.trigger_periodically(10, :second, :reset)

1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1}, window: partition_window)
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Flow.partition(stages: 1, window: join_window)
|> Flow.reduce(fn -> [] end, &merge_sorted/2)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect

Upvotes: 4

Related Questions