Reputation: 103
I'm beginning to learn Elixir and have come across a challenge that I haven't been able to solve easily.
I'm trying to create function that takes an Enumerable.t and returns another Enumerable.t that includes the next n items. It would have slightly different behaviour from Enum.chunk(e, n, 1, []) in that the number iteration count would always equal the original enumerable count. I also need to support Streams
@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
This is best illustrated with doctest syntax:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]
iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]
iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]
iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
I have investigated implementing the Enumerable.t protocol, but haven't quite understood the Enumerable.reduce interface.
Is there any succinct/elegant way of doing this?
My use case is for a small fixed n value (1 or 2) on a binary stream, so extra points for an optimized version. However, for the purpose of learning Elixir I'm interested in a solution across a number of use cases. Performance is important. I will run some benchmarks across various values of n for the solution and publish.
Benchmark Update - 8th April 2015
6 workable solutions have been posted. Details of the benchmarks are available at https://gist.github.com/spitsw/fce5304ec6941578e454. Benchmarks were run over a list with 500 items for various values of n.
For n=1 the following result:
PatrickSuspend.lookahead 104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op
For n=50 the following result:
PatrickSuspend.lookahead 220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)
Upvotes: 8
Views: 2362
Reputation: 31
I might be late with answer, but it can be done with Stream.chunk_while/4,
defmodule Denis do
def lookahead(enumerable) do
chunk_fun = fn
element, nil -> {:cont, element}
element, acc -> {:cont, [acc, element], element}
end
after_fun = fn
nil -> {:cont, []}
[] -> {:cont, []}
acc -> {:cont, [acc], []}
end
enumerable
|> Stream.chunk_while(nil, chunk_fun, after_fun)
end
end
Upvotes: 1
Reputation: 369
After drawing inspiration from Warren, I made this. Basic usage:
ex> {peek, enum} = StreamSplit.peek 1..10, 3
{[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>}
iex> Enum.take(enum, 5)
[1, 2, 3, 4, 5]
https://hex.pm/packages/stream_split
Upvotes: 0
Reputation: 54674
I had started a discussion about my proposed Stream.mutate
method on the elixir core mailing list, where Peter Hamilton suggested another way of solving this problem. By using make_ref
to create a globally unique reference, we can create a padding stream and concatenate it with the original enumerable to continue emitting after the original stream has halted. This can then either be used in conjunction with Stream.chunk
, which means we need to remove the unwanted references in a last step:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n))
|> Stream.chunk(n+1, 1)
|> Stream.map(&Enum.reject(&1, fn x -> x == stop end))
end
I think this is the prettiest solution yet, from a syntactical point of view. Alternatively, we can use Stream.transform
to build the buffer manually, which is quite similar to the manual solution I proposed earlier:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn val, acc ->
case {val, acc} do
{^stop, []} -> {[] , [] }
{^stop, [_|rest] = buf} -> {[buf], rest }
{val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] }
{val , [_|rest] = buf} -> {[buf], rest ++ [val]}
end
end)
end
I haven't benchmarked these solutions but I suppose the second one, although slightly clunkier, should perform a little bit better because it does not have to iterate over each chunk.
By the way, the second solution can be written without the case statement once Elixir allows to use the pin operator in function heads (probably in v1.1.0):
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn
^stop, [] -> {[] , [] }
^stop, [_|rest] = buf -> {[buf], rest }
val , buf when length(buf) < n+1 -> {[] , buf ++ [val] }
val , [_|rest] = buf -> {[buf], rest ++ [val]}
end)
end
Upvotes: 3
Reputation: 54674
As discussed in the comments, my first attempt had some performance problems and didn't work with streams that have side-effects, such as IO streams. I took the time to dig deeper into the stream library and finally came up with this solution:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
This may look daunting at first, but actually it's not that hard. I will try to break it down for you, but that's hard with a full-fledged example like this.
Let's start with a simpler example instead: A stream that endlessly repeats the value given to it. In order to emit a stream, we can return a function that takes an accumulator and a function as argument. To emit a value, we call the function with two arguments: the value to emit and the accumulator. acc
The accumulator is a tuple that consists of a command (:cont
, :suspend
or :halt
) and tells us what the consumer wants us to do; the result we need to return depends on the operation. If the stream should be suspended, we return a three-element tuple of the atom :suspended
, the accumulator and a function that will be called when the enumeration continues (sometimes called "continuation"). For the :halt
command, we simply return {:halted, acc}
and for the :cont
we emit a value by performing the recursive step as described above. The whole thing then looks like this:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
Now this is only one part of the puzzle. We can emit a stream, but we don't process an incoming stream yet. Again, to explain how that works it makes sense to construct a simpler example. Here, I will build a function that takes an enumerable and just suspends and re-emits for every value.
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
The first clause sets up the next
function that gets passed down to the do_passthrough
function. It serves the purpose of getting the next value from the incoming stream. The step function that is internally used defines that we suspend for every item in the stream. The rest is pretty similar except for the last clause. Here, we call the next function with {:cont, []}
to get a new value and process the result by means of a case statement. If there is a value, we get back {:suspended, val, next}
, if not, the stream is halted and we pass that through to the consumer.
I hope that clarifies a few things about how to build streams in Elixir manually. Unfortunately, there's an awful lot of boilerplate required to work with streams. If you go back to the lookahead
implementation now, you will see that there are only tiny differences, which are the actually interesting parts. There are two additional parameters: state
, which serves to differentiate between the :buffer
and :emit
steps, and buffer
which is pre-filled with n+1
items in the initial buffering step. In the emit phase, the current buffer is emitted and then shifted to the left on each iteration. We're done when the input stream halts or our stream is halted directly.
I am leaving my original answer here for reference:
Here's a solution that uses Stream.unfold/2
to emit a true stream of values
according to your specification. This means you need to add Enum.to_list
to
the end of your first two examples to obtain the actual values.
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
The general idea is that we keep a buf of the previous iterations around. On each iteration, we emit the current buf, take one value from the stream and append it to the end of the buf. This repeats until the buf is empty.
Example:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
Upvotes: 5
Reputation: 103
The below solution uses Stream.resource and the suspend capability of Enumerable.reduce. All of the examples pass.
In short, it uses Enumerable.reduce to build a list. It then suspends the reducer on each iteration removing the head of the list and added the newest item on the tail of the list. Finally, it produces the trailer of the stream when the reducer is :done or :halted. All this is coordinated using Stream.resource.
This would be more efficient if a FIFO queue was used instead of a list for each iteration.
Please provide feedback for any simplifications, efficiencies or bugs
def Module
def lookahead(enum, n) when n >= 0 do
reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn
item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list
item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list
item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit
end)
end
Stream.resource(reducer,
fn
{:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})}
{:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items
{:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items
end,
fn
{:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend
_ ->
end)
end
defp lookahead_trail(n, acc) do
case acc do
{action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here
{action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item
acc -> {:halt, acc } # Finish of the stream
end
end
end
Upvotes: 1
Reputation: 51349
Here is an inefficient implementation of such function:
defmodule Lookahead do
def lookahead(enumerable, n) when n > 0 do
enumerable
|> Stream.chunk(n + 1, 1, [])
|> Stream.flat_map(fn list ->
length = length(list)
if length < n + 1 do
[list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)]
else
[list]
end
end)
end
end
It builds on top of @hahuang65 implementation, except that we use a Stream.flat_map/2
to check the length of each emitted item, adding the missing ones as soon as we detect the emitted item got shorter.
A hand-written implementation from scratch would be faster because we would not need to call length(list)
on every iteration. The implementation above may be fine though if n
is small. If n is fixed, you could even pattern match on the generated list explicitly.
Upvotes: 3
Reputation: 2180
You should be able to use Stream.chunk/4
Would look something like this:
defmodule MyMod do
def lookahead(enum, amount) do
Stream.chunk(enum, amount + 1, 1, [])
end
end
With your inputs:
iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4]]
iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5)
[[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]]
Upvotes: 1