webish
webish

Reputation: 711

Rust Streaming Pipeline

I'm finding Rust streaming documentation to be convoluted and examples non-existent.

Trying to create a code example will likely be more confusing than outlining a problem.

Given a read stream that is being passed to a consuming function, I'd like to introduce a middle-step to the stream pipeline that counts bytes, but could do anything, with minimal overhead.

read stream --> middle read stream function --> write stream

Example Pipeline:

First get file stream, then execute transformation on stream, finally stream to disk (new file).

Upvotes: 1

Views: 1115

Answers (1)

Siiir
Siiir

Reputation: 350

I have implemented Read adaptor that should be zero cost abstraction for your purpose of introducing “middle step”.
It's something like Iterator::map but for Read instances.
(I assume Read instances are what you call streams. You seem to be using terminology from Java/C++. If you meant something else, please clarify.).

use std::io::{Read, Write};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // It's your “read stream”. Could be any.
    let reader = std::fs::OpenOptions::new().read(true).open("in_file.txt")?;
    // It's your adaptor to perform “middle step”.
    let mut trans_reader = TransReader {
        orginal: reader,
        transform: |bytes: &mut [u8]| {
            // Could be any transformation.
            bytes.make_ascii_uppercase();
        },
    };
    // It's your “write stream”. Could be any.
    let mut writer = std::fs::OpenOptions::new()
        .create(true)
        .truncate(true)
        .write(true)
        .open("out_file.txt")?;

    // You can read bytes in anyway you want and then write. Eg. read all then write all.
    let mut buf = vec![];
    trans_reader.read_to_end(&mut buf)?;
    writer.write_all(&buf)?;

    Ok(())
}

// It's my implementation of adaptor you need.
pub struct TransReader<R: Read, F: FnMut(&mut [u8])> {
    pub orginal: R,
    pub transform: F,
}
impl<Org: Read, F: FnMut(&mut [u8])> Read for TransReader<Org, F> {
    // This is the only method that needs to be implemented.
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let res = self.orginal.read(buf);
        if let Ok(written_count) = res {
            (self.transform)(&mut buf[..written_count]);
        }
        res
    }
    // I'm not 100% if manual impl. of other methods would also make for performance.
}

Upvotes: 1

Related Questions