Reputation: 1843
I'm trying to read a file, decrypt it, and return the data. Because the file is potentially very big, I want to do this in a stream.
I cannot find a good pattern to implement the stream. I'm trying to do something like this:
let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
if decrypted_length < start + length {
let mut encrypted_chunk = vec![0u8; encrypted_block_size];
match f.read(&mut encrypted_chunk[..]) {
Ok(size) => {
if size > 0 {
let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
let updated_decrypted_length = decrypted_length + decrypted.len();
Some((decrypted, updated_decrypted_length))
} else {
None
}
}
Err(e) => {
println!("Error {}", e);
None
}
}
} else {
None
}
});
The problem is that f.read
is not allowed in the above async closure with the following error:
89 | | match f.read(&mut encrypted_chunk[..]) {
| | -
| | |
| | move occurs because `f` has type `std::fs::File`, which does not implement the `Copy` trait
| | move occurs due to use in generator
I don't want to open f
inside the closure itself. Is there any better way to fix this? I am OK with using a different crate or trait, or method (i.e. not stream::unfold
).
Upvotes: 5
Views: 12896
Reputation: 1843
I found a solution: using async-stream
crate at here.
One of the reasons stream::unfold
did not work for me is that the async move
closure does not allow access mut
variables outside, for example the f
file handle.
Now with async-stream
, I changed my code to the following, and it works: (note the yield
added by this crate).
use async_stream::try_stream;
<snip>
try_stream! {
while decrypted_length < start + length {
match f.read(&mut encrypted_chunk[..]) {
Ok(size) =>
if size > 0 {
println!("read {} bytes", size);
let decrypted = my_decrypt_fn(&encrypted_chunk[..size], ..);
decrypted_length = decrypted_length + decrypted.len();
yield decrypted;
} else {
break
}
Err(e) => {
println!("Error {}", e);
break
}
}
}
}
UPDATE:
I found that async-stream
has some limitations that I cannot ignore. I ended up implementing Stream
directly and no longer using async-stream
. Now my code looks like this:
pub struct DecryptFileStream {
f: File,
<other_fields>,
}
impl Stream for DecryptFileStream {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>,
_cx: &mut Context<'_>) -> Poll<Option<io::Result<Vec<u8>>>> {
// read the file `f` of self and business_logic
//
if decrypted.len() > 0 {
Poll::Ready(Some(Ok(decrypted)))
} else {
Poll::Ready(None)
}
}
}
//. then use the above stream:
let stream = DecryptFileStream::new(...);
Response::new(Body::wrap_stream(stream))
Upvotes: 5
Reputation: 11711
stream::unfold
is only for types that implement Stream
, which in Rust is used exclusively for asynchronous programming. If you want to do synchronous reading, what you're calling a "stream" is tagged as implementing Read
in Rust. Thus you can call Read::read()
to read some data off the current position of your File
(limited by the length of the buffer you pass in), and decrypt that data.
Upvotes: 0