ivan199415
ivan199415

Reputation: 438

Parallel iteration through a file in Rust and storing results in a HashMap

Here is the problem. I have a TSV file (it has certain lines starting with # so I ignore them) of large size. The TSV file has 5 columns, the last column is a string that looks like this (1,2)(3,4)(7,12) etc and first column is a non-unique ID (let's call it RefContigID. I want to find the first pair of the last element (7 in the previous example) and store lines that have the largest first pair.

To that exent, I've implemented the following solution in Rust:

Created a struct to hold the lines

The struct implements from_line because it's made from line, and last_position which just splits by )(, takes last element, splits ˙it by , and takes the first element.

#[derive(Deserialize)]
struct AlLine {
    qry_contig_id : u32,
    ref_contig_id : u8,
    ref_start : f64,
    ref_end : f64,
    conf : f64,
    align : String,
}

impl AlLine {
    fn from_line(line: &str) -> Self {
        let fields: Vec<&str> = line.split('\t').collect();
    
        let qry_contig_id: u32 = fields[0].parse().unwrap();
        let ref_contig_id: u8 = fields[1].parse().unwrap();
        let ref_start: f64 = fields[2].parse().unwrap();
        let ref_end: f64 = fields[3].parse().unwrap();
        let conf: f64 = fields[4].parse().unwrap();
        let align: String = fields[5].to_string();

        AlLine{qry_contig_id,ref_contig_id,ref_start,ref_end,conf,align}
    }

    fn last_position(&self) -> u32{
        let align = &self.align;
        let last_position: u32 = align[1..align.len()-1].split(")(").last().unwrap().split(",").next().unwrap().parse().unwrap();
        last_position
    }

}

Created function for processing

The function implements two HashMaps - one stores the largest label for said RefContigID and the other stores a vector of AlLine structs for said label

(I prefer the implementation via struct rather than lines because I intend to use said struct later on)

Iterating through the file, I extract the lines into AlLine struct, find its last_position and then, depending on the existence of said AlLine struct's ID insert it into two hashmaps with their respective values (integer and AlLin) or check with the previous values. If the new value is larger than previous value, I overwrite the entry for both hashmaps, and if the new value is equal to the previous value, I simply append the struct to the second hashmap:

fn process_file(file_path: &PathBuf) -> Result<Vec<AlLine>, Box<dyn Error>> {

    // Two hashmaps
    let mut largest_labels: HashMap<u8, u32> = HashMap::new();
    let mut last_alignments: HashMap<u8, Vec<AlLine>> = HashMap::new();

    let file = File::open(file_path)?;
    let reader = BufReader::new(file);

    for line in reader.lines() {
        let line = line?;
        if line.starts_with('#') {
            continue; // Skip comment lines
        }

        let alline = AlLine::from_line(&line);
        let ref_contig_id = alline.ref_contig_id;
        // first_label not yet used but will be
        // need to create two new hashmaps


        // matching logic
        let last_label: u32 =  alline.last_position();
        match largest_labels.get(&ref_contig_id){
            Some(&max_label) => {
                if last_label > max_label {
                    // Overwrite previous
                    largest_labels.insert(ref_contig_id,last_label);
                    last_alignments.insert(ref_contig_id,vec![alline]);
                } else if last_label == max_label{
                    last_alignments.entry(ref_contig_id).or_insert(vec![]).push(alline);
                }
            }
            None => {
                // Making a new entry
                largest_labels.insert(ref_contig_id,last_label);
                last_alignments.insert(ref_contig_id,vec![alline]);
            }
        }
    }
    let mut merged: Vec<AlLine> = Vec::new();
    for (_, lines) in last_alignments.into_iter() {
        merged.extend(lines);
    }
    Ok(merged)
}

(I then merge them and return said value).

Now I want to implement parallelization via rayon crate and to be honest, I'm struggling.

I've started by using rayon:

use rayon::ThreadPoolBuilder;
use rayon::prelude::*; 

fn process_parallel(file_path: &PathBuf) -> Result<HashMap<u8, Vec<AlLine>>, Box<dyn Error>> {
    // Restrict Rayon to use only 2 threads
    ThreadPoolBuilder::new().num_threads(2).build_global().unwrap();

    // Rest of the function remains unchanged
    let mut largest_labels: HashMap<u8, u32> = HashMap::new();
    let mut last_alignments: HashMap<u8, Vec<AlLine>> = HashMap::new();

    let file = File::open(file_path)?;
    let reader = BufReader::new(file);

    // Parallelize the iteration over lines
    reader
        .lines()
        .par_bridge() // par bridge needed instead of par_iter
        .filter_map(|line| {
            if let Ok(ref line) = line {
                if !line.starts_with('#') {
                    Some(line.as_str()) // Convert to &str
                } else {
                    None
                }
            } else {
                None
            }
        })
        .for_each(|line| {
          
            let alline = AlLine::from_line(&line);
            let ref_contig_id = alline.ref_contig_id;

            // matching logic
            let last_label: u32 = alline.last_position();
            match largest_labels.get(&ref_contig_id) {
                Some(&max_label) => {
                    if last_label > max_label {
                        // Overwrite previous
                        largest_labels.insert(ref_contig_id, last_label);
                        last_alignments.insert(ref_contig_id, vec![alline]);
                    } else if last_label == max_label {
                        last_alignments
                            .entry(ref_contig_id)
                            .or_insert_with(Vec::new)
                            .push(alline);
                    }
                }
                None => {
                    // Making a new entry
                    largest_labels.insert(ref_contig_id, last_label);
                    last_alignments.insert(ref_contig_id, vec![alline]);
                }
            }     
        });

    Ok(last_alignments)
}

I'm filtering out comments using filter_map, and iterating through io via par_bridge, and simply copying the logic (sans the merging) of single threaded solution into for_each. However, I get two errors I've been unable to bridge over:

error[E0596]: cannot borrow `last_alignments` as mutable, as it is a captured variable in a `Fn` closure
   --> src\main.rs:137:25
    |
137 |                         last_alignments

and :

error[E0515]: cannot return value referencing local data `line.0`
113 |             if let Ok(ref line) = line {
    |                       -------- `line.0` is borrowed here
114 |                 if !line.starts_with('#') {
115 |                     Some(line.as_str()) // Convert to &str
    |                     ^^^^^^^^^^^^^^^^^^^ returns a value referencing data owned by the current function

How should I proceed further? I've tried some ChatGPT boosted solutions via DashMap and similar but it just broke the result further.

EDIT: I've implement what CGPT and @ByteVoyager said.

First off, I've replaced:

.filter_map(|line| {
            if let Ok(ref line) = line {
                if !line.starts_with('#') {
                    Some(line.as_str()) // Convert to &str
                } else {
                    None
                }
            } else {
                None
            }
        })

with the following:

.filter_map(Result::ok)
.filter(|line| !line.starts_with('#'))

The filter map is needed to convert Result<String, Error> to string. I've also replaced all instances of HashMap with DashMap, (dashmap = "5.1.0" in TOML file). The error I got was:

error[E0308]: mismatched types
   --> src\main.rs:124:33
    |
124 |                 if last_label > max_label {
    |                    ----------   ^^^^^^^^^ expected `u32`, found `Ref<'_, u16, u32>`
    |                    |
    |                    expected because this is `u32`
    |
    = note: expected type `u32`
             found struct `dashmap::mapref::one::Ref<'_, u16, u32>`
help: consider dereferencing the type
 if last_label > *max_label {

So I've did just that. Problem is that it runs but stops producing output - (I've added println!() in all three cases) and after a variable number of runs stops producing anything. Have no idea how to proceed further.

Upvotes: 0

Views: 124

Answers (1)

ByteVoyager
ByteVoyager

Reputation: 121

Problem 1:

error[E0596]: cannot borrow `last_alignments` as mutable, as it is a captured variable in a `Fn` closure

Essentially, the issue here is that the for_each method uses multi-threading, so the closure you pass in is not allowed to directly mutate anything in the surrounding state. Imagine if two different threads attempted to insert into the HashMap at the same time. That could easily lead to undefined behavior.

There are a couple ways to solve this problem. Firstly, you could wrap your HashMap in a Mutex. Then, if two threads attempt to insert into the HashMap at the same time, one will block until the other is completed. You could also use DashMap as ChatGPT suggested. The reason this works is that DashMap's insert method takes a shared reference to self, so it doesn't count as mutation. Switching to DashMap should be as simple as changing HashMap::new to DashMap::new.

Problem 2:

error[E0515]: cannot return value referencing local data `line.0`
113 |             if let Ok(ref line) = line {
    |                       -------- `line.0` is borrowed here
114 |                 if !line.starts_with('#') {
115 |                     Some(line.as_str()) // Convert to &str
    |                     ^^^^^^^^^^^^^^^^^^^ returns a value referencing data owned by the current function

I don't understand why you are trying to convert from String to &str here. The code should work if you change the filter_map into a regular filter.

.filter(|line| !line.starts_with('#'))

Upvotes: 1

Related Questions