Reputation: 45
My goal: I want to read the first line of a file, restructure the input in a pipeline, and then process the rest of the file in another pipeline.
My problem: The Stream resets on each new pipeline.
Example code:
defmodule StrangeStream do
fs = File.stream!("file.txt")
Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()
Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()
end
Text file file.txt
:
First line.
Second line.
Third line.
Output:
["First line.\n"]
["First line.\n"]
As you can see, the Stream resets in each pipeline. Each pipeline starts from the first line in the file. How do I maintain the state of the stream between calls to a pipeline? Thanks in advance!
Upvotes: 0
Views: 315
Reputation: 45
Here's how I got the effect I wanted. Hope it helps others who look into this.
Again, many thanks to Aleksei for saving me so much time.
defmodule StrangeStream do
do_stuff = fn(something) ->
# We'd do something useful here
something
end
{:ok, file} = File.open("file.txt", [:read, :line])
# Read the first line
first_line = IO.read(file, :line)
|> String.trim()
|> do_stuff.()
|> IO.inspect([label: "first_line"])
# Create side-effect streams
print_stream = IO.binstream(:stdio, :line)
file_stream = File.stream!("output.txt", [:write, :append])
# Convert IO to Stream and process
IO.stream(file, :line)
|> Stream.map(&String.trim(&1))
|> do_stuff.()
|> Stream.into(print_stream, fn(s)-> s <>"\n" end)
|> do_stuff.()
|> Stream.into(file_stream)
|> do_stuff.()
|> Enum.to_list()
|> IO.inspect([label: "rest of file"])
end
Output
first_line: "First line."
Second line.
Third line.
rest of file: ["Second line.", "Third line."]
Upvotes: 1
Reputation: 121000
TL;DR: you can’t.
There is no mutable state in elixir, hence it’s impossible to maintain a state of the resource.
The only similar thing would be to suspend the enumerable during reducing, but even this is not possible directly with streams.
You can resort to Stream.transform/4
and maintain the state yourself, choosing the pipeline accordingly.
Sidenote: Enum.to_list/1
already terminates a stream, so the approach in the question would not work at all.
Upvotes: 0