user1783732
user1783732

Reputation: 1843

How to create a Stream from reading and transforming a file?

I'm trying to read a file, decrypt it, and return the data. Because the file is potentially very big, I want to do this in a stream.

I cannot find a good pattern to implement the stream. I'm trying to do something like this:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if decrypted_length < start + length {
        let mut encrypted_chunk = vec![0u8; encrypted_block_size];
        match f.read(&mut encrypted_chunk[..]) {
            Ok(size) => {
                if size > 0 {
                    let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
                    let updated_decrypted_length = decrypted_length + decrypted.len();
                    Some((decrypted, updated_decrypted_length))
                } else {
                    None
                }
            }
            Err(e) => {
                println!("Error {}", e);
                None
            }
        }
    } else {
        None
    }
});

The problem is that f.read is not allowed in the above async closure with the following error:

89  | |             match f.read(&mut encrypted_chunk[..]) {
    | |                   -
    | |                   |
    | |                   move occurs because `f` has type `std::fs::File`, which does not implement the `Copy` trait
    | |                   move occurs due to use in generator

I don't want to open f inside the closure itself. Is there any better way to fix this? I am OK with using a different crate or trait, or method (i.e. not stream::unfold).

Upvotes: 5

Views: 12896

Answers (2)

user1783732
user1783732

Reputation: 1843

I found a solution: using async-stream crate at here.

One of the reasons stream::unfold did not work for me is that the async move closure does not allow access mut variables outside, for example the f file handle.

Now with async-stream, I changed my code to the following, and it works: (note the yield added by this crate).

use async_stream::try_stream;

<snip>

    try_stream! {
        while decrypted_length < start + length {
            match f.read(&mut encrypted_chunk[..]) {
                Ok(size) => 
                    if size > 0 {
                        println!("read {} bytes", size);
                        let decrypted = my_decrypt_fn(&encrypted_chunk[..size], ..);
                        decrypted_length = decrypted_length + decrypted.len();
                        yield decrypted;
                    } else {
                        break
                    }
                Err(e) => {
                    println!("Error {}", e);
                    break
                }
            }
        }
    }

UPDATE:

I found that async-stream has some limitations that I cannot ignore. I ended up implementing Stream directly and no longer using async-stream. Now my code looks like this:

pub struct DecryptFileStream {
    f: File,
    <other_fields>,
}

impl Stream for DecryptFileStream {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>,
                  _cx: &mut Context<'_>) -> Poll<Option<io::Result<Vec<u8>>>> {
         // read the file `f` of self and business_logic
         // 
         if decrypted.len() > 0 {
             Poll::Ready(Some(Ok(decrypted)))
         } else {
             Poll::Ready(None)
         }
    }
}

//. then use the above stream: 

    let stream = DecryptFileStream::new(...);
    Response::new(Body::wrap_stream(stream))

Upvotes: 5

djc
djc

Reputation: 11711

stream::unfold is only for types that implement Stream, which in Rust is used exclusively for asynchronous programming. If you want to do synchronous reading, what you're calling a "stream" is tagged as implementing Read in Rust. Thus you can call Read::read() to read some data off the current position of your File (limited by the length of the buffer you pass in), and decrypt that data.

Upvotes: 0

Related Questions