user2752148
user2752148

Reputation: 35

Rust Rayon possible bug

I think I may have found a bug in rayon, but before bothering the

maintainers I would would appreciate a sanity check.

I am in a class on multithreading and wrote a few implementatinos of "adding" two images. Here adding means doing a pixel-wise weighted sum with the images weighted at 0.7 and 0.3, respectively.

My implementation using rayon Bmp.image_add_mt0 result: https://imgur.com/a/GnRbY18

Removing the par_bridge() from image_add_mt0 fixes it.

My std::thread implementation Bmp.image_add_mt1 result: https://imgur.com/a/kRcp1U1

This has the same result as Bmp.image_add (the single-threaded version).

In both implementations, I chop the image up into row chunks for each thread to process. In the rayon version, the row chunks are garbled.

The inputs to this program are bmp images. If you want to use the same ones I used, let me know. It should work with any two images of the same size though.

My code:

Bmp.image_add_mt0

    fn image_add_mt0(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        rayon::ThreadPoolBuilder::new()
            .num_threads(num_threads)
            .build_global()
            .unwrap();

        let n_rows_in_chunk = self
            .header
            .height
            / num_threads;

        let bmp_out_img: Vec<Vec<Pixel>> = self
            .img
            .chunks(n_rows_in_chunk)
            .zip(
                other
                    .img
                    .chunks(n_rows_in_chunk),
            )
            .par_bridge()
            .flat_map(
                |(rows1_chunk, rows2_chunk)| {
                    rows1_chunk
                        .iter()
                        .zip(rows2_chunk.iter())
                        .map(
                            |(row1, row2)| {
                                row1.iter()
                                    .zip(row2.iter())
                                    .map(
                                        |(p1, p2)| {
                                            p1.weighted_add(
                                                p2,
                                            )
                                        },
                                    )
                                    .collect::<Vec<Pixel>>()
                            },
                        )
                        .collect::<Vec<Vec<Pixel>>>()
                },
            )
            .collect();

        Self {
            header: self.header,
            img: bmp_out_img,
            row_size: self.row_size,
            _n_colors: self._n_colors,
        }
    }

Bmp.image_add_mt1:

    fn image_add_mt1(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        let mut bmp_out = self.clone();

        let row_chunk_size = self
            .header
            .height
            / num_threads;

        std::thread::scope(
            |scope| {
                let mut thread_handles = vec![];
                for rows_chunk in izip!(
                    self.img
                        .chunks(row_chunk_size),
                    other
                        .img
                        .chunks(row_chunk_size),
                    bmp_out
                        .img
                        .chunks_mut(row_chunk_size)
                ) {
                    thread_handles.push(
                scope.spawn(
                    move || {
                        let (
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk,
                        ) = rows_chunk;
                        for (row1, row2, row_out) in izip!(
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk
                        ) {
                            for (p1, p2, p_out) in izip!(
                                row1.iter(),
                                row2.iter(),
                                row_out.iter_mut()
                            ) {
                                *p_out =
                                    p1.weighted_add(p2);
                            }
                        }
                    },
                ),
            )
                }
                for handle in thread_handles {
                    handle
                        .join()
                        .unwrap();
                }
            },
        );

        bmp_out
    }

main.rs:

#![feature(portable_simd)]

use anyhow::{Context, Error, Result, anyhow};
use itertools::izip;
use rayon::prelude::*;
use std::simd::u8x32;
use std::time::Instant;

const MAX_THREADS: usize = 128;

fn main() -> Result<()> {
    let args: Vec<_> = std::env::args().collect();

    let (num_threads, mt_func_number): (
        usize,
        u8,
    ) = match args.len() {
        4 => (0, 0),
        5 => (
            args[4].parse().context(format!("Expected number for threads parameter, got {}.", args[4]))?,
            0,
        ),
        6 => (
            args[4].parse().context(format!("Expected number for threads parameter, got {}.", args[4]))?,
            args[5].parse().context(format!("Expected number for MT Function Number parameter, got {}.", args[5]))?,
        ),
        _ => {
            return Err(
                anyhow!("Invalid input arguments",),
            );
        }
    };
    let input_file1 = args[1].clone();
    let input_file2 = args[2].clone();
    let output_file = args[3].clone();

    let bmp1 = Bmp::open(input_file1)?;

    let bmp2 = Bmp::open(input_file2)?;

    if bmp1
        .header
        .width
        != bmp2
            .header
            .width
        || bmp1
            .header
            .height
            != bmp2
                .header
                .height
    {
        return Err(
            anyhow!(
                "Input images have different shapes, which is not supported.",
            ),
        );
    }

    let start_execution = Instant::now();

    let bmp_out = match num_threads {
        0 => bmp1.image_add(&bmp2),
        1..MAX_THREADS => match mt_func_number {
            0 => bmp1.image_add_mt0(
                &bmp2,
                num_threads,
            ),
            1 => bmp1.image_add_mt1(
                &bmp2,
                num_threads,
            ),
            2 => bmp1.image_add_mt2(
                &bmp2,
                num_threads,
            ),
            3 => bmp1.image_add_mt3(
                &bmp2,
                num_threads,
            ),
            _ => {
                return Err(
                    anyhow!(
                        "Expected MT function number between 0 and 3. Got {}",
                        mt_func_number
                    ),
                );
            }
        },
        _ => {
            return Err(
                anyhow!(
                    "Expected number of threads between 0 and {}. Got {}",
                    MAX_THREADS,
                    num_threads
                ),
            );
        }
    };

    let execution_duration = start_execution
        .elapsed()
        .as_micros();

    println!(
        "Total execution time:  {} ms  ( {} ns/pixel)",
        execution_duration,
        execution_duration as f64 * 1e3
            / (bmp1
                .header
                .height as f64
                * bmp1
                    .header
                    .width as f64)
    );

    bmp_out.save(output_file)?;
    Ok(())
}

#[derive(Clone)]
struct Bmp {
    header: BmpHeader,
    img: Vec<Vec<Pixel>>,
    row_size: usize,
    _n_colors: usize,
}

#[derive(Debug, Clone, Copy)]
struct BmpHeader {
    file_size: usize,
    data_offset: usize,
    dib_header_size: usize,
    width: usize,
    height: usize,
    bits_per_pixel: usize,
    compression_method: u32,
    image_size: usize,
    horizontal_resolution: u32,
    vertical_resolution: u32,
}

#[derive(Clone, Copy)]
union Pixel {
    bgr: Bgr,
    bytes: [u8; 3],
}

#[derive(Debug, Clone, Copy, Default)]
struct Bgr {
    b: u8,
    g: u8,
    r: u8,
}

impl Pixel {
    fn as_bgr(&self) -> Bgr {
        unsafe { self.bgr }
    }
    fn as_bytes(&self) -> &[u8] {
        unsafe { &self.bytes }
    }
    fn weighted_add(&self, other: &Self) -> Self {
        let b = (self
            .as_bgr()
            .b as f32
            * 0.7
            + other
                .as_bgr()
                .b as f32
                * 0.3) as u8;
        let g = (self
            .as_bgr()
            .g as f32
            * 0.7
            + other
                .as_bgr()
                .g as f32
                * 0.3) as u8;
        let r = (self
            .as_bgr()
            .r as f32
            * 0.7
            + other
                .as_bgr()
                .r as f32
                * 0.3) as u8;

        Self {
            bgr: Bgr {
                b,
                g,
                r,
            },
        }
    }

    fn weighted_add_approximate(
        &self,
        other: &Self,
    ) -> Self {
        let b = (self
            .as_bgr()
            .b
            >> 1)
            + (other
                .as_bgr()
                .b
                >> 2);
        let g = (self
            .as_bgr()
            .g
            >> 1)
            + (other
                .as_bgr()
                .g
                >> 2);
        let r = (self
            .as_bgr()
            .r
            >> 1)
            + (other
                .as_bgr()
                .r
                >> 2);

        Self {
            bgr: Bgr {
                b,
                g,
                r,
            },
        }
    }
}

impl TryFrom<&[u8]> for Pixel {
    type Error = Error;
    fn try_from(s: &[u8]) -> Result<Self> {
        match s.len() {
            3 => Ok(
                Pixel {
                    bgr: Bgr {
                        b: s[0],
                        g: s[1],
                        r: s[2],
                    },
                },
            ),
            l => Err(
                anyhow!(
                    "Tried to convert a slice of length {} into a pixel. This is not supported.",
                    l
                ),
            ),
        }
    }
}

impl From<&Pixel> for Vec<u8> {
    fn from(pixel: &Pixel) -> Self {
        pixel
            .as_bytes()
            .to_owned()
    }
}

impl TryFrom<&[u8]> for BmpHeader {
    type Error = Error;
    fn try_from(header: &[u8]) -> Result<Self> {
        match String::from_utf8(header[0..2].into())?
            .as_str()
        {
            "BM" => println!(
                "File starts with 'BM', which is a valid start."
            ),
            x => {
                return Err(
                    anyhow!(
                        "First two bytes are '{}', which is not valid or supported.",
                        x
                    ),
                );
            }
        };

        let file_size =
            u32::from_le_bytes(header[2..6].try_into()?)
                as usize;
        let data_offset =
            u32::from_le_bytes(header[10..14].try_into()?)
                as usize;

        let dib_header_size =
            u32::from_le_bytes(header[14..18].try_into()?)
                as usize;
        match dib_header_size {
            40 => println!(
                "DIB header is of type BITMAPINFOHEADER, based on DIB header size of 40"
            ),
            x => {
                return Err(
                    anyhow!(
                        "DIB header size {} is unknown and unsupported.",
                        x
                    ),
                );
            }
        };

        let (_main_header, remaining_bytes) =
            header.split_at(14);
        let (dib_header, _remaining_bytes) =
            remaining_bytes.split_at(dib_header_size);

        let width = u32::from_le_bytes(
            dib_header[4..8].try_into()?,
        ) as usize;
        let height = u32::from_le_bytes(
            dib_header[8..12].try_into()?,
        ) as usize;
        let color_planes = u16::from_le_bytes(
            dib_header[12..14].try_into()?,
        ) as usize;
        if color_planes != 1 {
            return Err(
                anyhow!(
                    "Number of color planes must be 1, not {}",
                    color_planes
                ),
            );
        }
        let bits_per_pixel = u16::from_le_bytes(
            dib_header[14..16].try_into()?,
        ) as usize;

        let compression_method = u32::from_le_bytes(
            dib_header[16..20].try_into()?,
        );
        match compression_method {
            0 => println!("Compression method used: None"),
            x => {
                return Err(
                    anyhow!(
                        "Unsupported compression type code {}",
                        x
                    ),
                );
            }
        };

        let image_size = u32::from_le_bytes(
            dib_header[20..24].try_into()?,
        ) as usize;

        // resolution in pixels per meter
        let horizontal_resolution = u32::from_le_bytes(
            dib_header[24..28].try_into()?,
        );
        let vertical_resolution = u32::from_le_bytes(
            dib_header[28..32].try_into()?,
        );

        let _num_colors = u32::from_le_bytes(
            dib_header[32..36].try_into()?,
        );

        let _num_important_colors = u32::from_le_bytes(
            dib_header[36..40].try_into()?,
        );
        Ok(
            Self {
                file_size,
                data_offset,
                dib_header_size,
                width,
                height,
                bits_per_pixel,
                compression_method,
                image_size,
                horizontal_resolution,
                vertical_resolution,
            },
        )
    }
}

impl TryFrom<BmpHeader> for Vec<u8> {
    type Error = Error;

    fn try_from(bmp_header: BmpHeader) -> Result<Self> {
        let mut header_bytes: Vec<u8> =
            Vec::with_capacity(bmp_header.data_offset);

        header_bytes.extend_from_slice("BM".as_bytes());
        header_bytes.extend(
            u32::try_from(bmp_header.file_size)?
                .to_le_bytes(),
        );
        header_bytes.extend([0; 4]);
        header_bytes.extend(
            u32::try_from(bmp_header.data_offset)?
                .to_le_bytes(),
        );
        header_bytes.extend(
            u32::try_from(bmp_header.dib_header_size)?
                .to_le_bytes(),
        );
        header_bytes.extend(
            u32::try_from(bmp_header.width)?.to_le_bytes(),
        );
        header_bytes.extend(
            u32::try_from(bmp_header.height)?.to_le_bytes(),
        );
        header_bytes.extend(1u16.to_le_bytes());
        header_bytes.extend(
            u16::try_from(bmp_header.bits_per_pixel)?
                .to_le_bytes(),
        );
        header_bytes.extend(
            bmp_header
                .compression_method
                .to_le_bytes(),
        );
        header_bytes.extend(
            u32::try_from(bmp_header.image_size)?
                .to_le_bytes(),
        );
        header_bytes.extend(
            bmp_header
                .horizontal_resolution
                .to_le_bytes(),
        );
        header_bytes.extend(
            bmp_header
                .vertical_resolution
                .to_le_bytes(),
        );
        header_bytes.extend([0; 8]);

        Ok(header_bytes)
    }
}

impl Bmp {
    fn open<P: AsRef<std::path::Path>>(
        path: P,
    ) -> Result<Self> {
        let bmp_file =
            std::fs::read(&path).with_context(|| {
                format!(
                    "Failed to read file from {}",
                    path.as_ref()
                        .display()
                )
            })?;

        let data_offset = u32::from_le_bytes(
            bmp_file[10..14].try_into()?,
        ) as usize;

        let (header_bytes, remaining_bytes) =
            bmp_file.split_at(data_offset);

        let header = BmpHeader::try_from(header_bytes)?;

        let img_bytes =
            remaining_bytes[..header.image_size].to_owned();

        let row_size =
            ((header.bits_per_pixel * header.width + 24)
                / 32)
                * 4;

        let _n_colors: usize = 3;
        let img: Vec<Vec<Pixel>> = img_bytes
            .chunks(row_size)
            .map(
                |row| {
                    row[..header.width * 3]
                        .chunks_exact(_n_colors)
                        .map(
                            |pixel_slice| {
                                pixel_slice
                                    .try_into()
                                    .unwrap()
                            },
                        )
                        .collect()
                },
            )
            .collect();

        Ok(
            Self {
                header,
                img,
                row_size,
                _n_colors,
            },
        )
    }

    fn save<P: AsRef<std::path::Path>>(
        &self,
        path: P,
    ) -> Result<()> {
        let mut file_bytes: Vec<u8> = Vec::with_capacity(
            self.header
                .file_size,
        );

        let header_bytes: Vec<u8> = self
            .header
            .try_into()?;
        file_bytes.extend(header_bytes);
        let img_bytes = self
            .img
            .iter()
            .flat_map(
                |row| {
                    let mut row_bytes: Vec<u8> = row
                        .iter()
                        .flat_map(|pixel| Vec::from(pixel))
                        .collect();
                    row_bytes.extend(vec![
                        0u8;
                        self.row_size
                            - self
                                .header
                                .width
                                * 3
                    ]);
                    row_bytes
                },
            );
        file_bytes.extend(img_bytes);
        std::fs::write(
            path, file_bytes,
        )?;

        Ok(())
    }

    fn image_add(&self, other: &Bmp) -> Self {
        let mut bmp_out = self.clone();

        bmp_out.img = self
            .img
            .iter()
            .zip(
                other
                    .img
                    .iter(),
            )
            .map(
                |(row1, row2)| {
                    row1.iter()
                        .zip(row2.iter())
                        .map(|(p1, p2)| p1.weighted_add(p2))
                        .collect()
                },
            )
            .collect();

        bmp_out
    }

    fn image_add_mt0(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        rayon::ThreadPoolBuilder::new()
            .num_threads(num_threads)
            .build_global()
            .unwrap();

        let n_rows_in_chunk = self
            .header
            .height
            / num_threads;

        let bmp_out_img: Vec<Vec<Pixel>> = self
            .img
            .chunks(n_rows_in_chunk)
            .zip(
                other
                    .img
                    .chunks(n_rows_in_chunk),
            )
            .par_bridge()
            .flat_map(
                |(rows1_chunk, rows2_chunk)| {
                    rows1_chunk
                        .iter()
                        .zip(rows2_chunk.iter())
                        .map(
                            |(row1, row2)| {
                                row1.iter()
                                    .zip(row2.iter())
                                    .map(
                                        |(p1, p2)| {
                                            p1.weighted_add(
                                                p2,
                                            )
                                        },
                                    )
                                    .collect::<Vec<Pixel>>()
                            },
                        )
                        .collect::<Vec<Vec<Pixel>>>()
                },
            )
            .collect();

        Self {
            header: self.header,
            img: bmp_out_img,
            row_size: self.row_size,
            _n_colors: self._n_colors,
        }
    }

    fn image_add_mt1(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        let mut bmp_out = self.clone();

        let row_chunk_size = self
            .header
            .height
            / num_threads;

        std::thread::scope(
            |scope| {
                let mut thread_handles = vec![];
                for rows_chunk in izip!(
                    self.img
                        .chunks(row_chunk_size),
                    other
                        .img
                        .chunks(row_chunk_size),
                    bmp_out
                        .img
                        .chunks_mut(row_chunk_size)
                ) {
                    thread_handles.push(
                scope.spawn(
                    move || {
                        let (
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk,
                        ) = rows_chunk;
                        for (row1, row2, row_out) in izip!(
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk
                        ) {
                            for (p1, p2, p_out) in izip!(
                                row1.iter(),
                                row2.iter(),
                                row_out.iter_mut()
                            ) {
                                *p_out =
                                    p1.weighted_add(p2);
                            }
                        }
                    },
                ),
            )
                }
                for handle in thread_handles {
                    handle
                        .join()
                        .unwrap();
                }
            },
        );

        bmp_out
    }

    fn image_add_mt2(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        let mut bmp_out = self.clone();

        let row_chunk_size = self
            .header
            .height
            / num_threads;

        std::thread::scope(
            |scope| {
                let mut thread_handles = vec![];
                for rows_chunk in izip!(
                    self.img
                        .chunks(row_chunk_size),
                    other
                        .img
                        .chunks(row_chunk_size),
                    bmp_out
                        .img
                        .chunks_mut(row_chunk_size)
                ) {
                    thread_handles.push(
                scope.spawn(
                    move || {
                        let (
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk,
                        ) = rows_chunk;
                        for (row1, row2, row_out) in izip!(
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk
                        ) {
                            for (p1, p2, p_out) in izip!(
                                row1.iter(),
                                row2.iter(),
                                row_out.iter_mut()
                            ) {
                                *p_out =
                                    p1.weighted_add_approximate(p2);
                            }
                        }
                    },
                ),
            )
                }
                for handle in thread_handles {
                    handle
                        .join()
                        .unwrap();
                }
            },
        );

        bmp_out
    }

    fn image_add_mt3(
        &self,
        other: &Bmp,
        num_threads: usize,
    ) -> Self {
        let mut bmp_out = self.clone();

        let row_chunk_size = self
            .header
            .height
            / num_threads;

        let offset_limit = self
            .header
            .width
            * 3
            - self
                .header
                .width
                % 32;

        std::thread::scope(
            |scope| {
                let mut thread_handles = vec![];
                for rows_chunk in izip!(
                    self.img
                        .chunks(row_chunk_size),
                    other
                        .img
                        .chunks(row_chunk_size),
                    bmp_out
                        .img
                        .chunks_mut(row_chunk_size)
                ) {
                    thread_handles.push(
                scope.spawn(
                    move || {
                        let (
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk,
                        ) = rows_chunk;
                        for (row1, row2, row_out) in izip!(
                            rows1_chunk,
                            rows2_chunk,
                            rows_out_chunk
                        ) {
                            let r1_ptr = row1.as_ptr() as *const u8;
                            let r2_ptr = row2.as_ptr() as *const u8;
                            let ro_ptr = row_out.as_mut_ptr() as *mut u8;

                            for offset in (0..offset_limit).step_by(32){
                                unsafe{
                                    let r1_chunk: u8x32 = std::mem::transmute(*(r1_ptr.add(offset) as *const [u8;32]));
                                    let r2_chunk: u8x32 = std::mem::transmute(*(r2_ptr.add(offset) as *const [u8;32]));

                                    #[rustfmt::skip]
                                    let sum = r1_chunk / u8x32::from_array([2; 32]) + r2_chunk / u8x32::from_array([4; 32]);

                                    std::ptr::copy_nonoverlapping(&sum, ro_ptr.add(offset) as *mut u8x32, 32);
                                }
                            }
                            for offset in offset_limit..self.header.width * 3 {
                                unsafe{
                                    *ro_ptr.add(offset) = *r1_ptr.add(offset) / 2 + *r2_ptr.add(offset) / 4;
                                }
                            }
                        }
                    },
                ),
            )
                }
                for handle in thread_handles {
                    handle
                        .join()
                        .unwrap();
                }
            },
        );

        bmp_out
    }
}

Cargo.toml:

[package]
name = "imadd_rs"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = "1.0.95"
itertools = "0.14.0"
rayon = "1.10.0"

Upvotes: -2

Views: 55

Answers (0)

Related Questions