Reputation: 438
Here is the problem. I have a TSV file (it has certain lines starting with #
so I ignore them) of large size. The TSV file has 5 columns, the last column is a string that looks like this (1,2)(3,4)(7,12)
etc and first column is a non-unique ID (let's call it RefContigID
. I want to find the first pair of the last element (7
in the previous example) and store lines that have the largest first pair.
To that exent, I've implemented the following solution in Rust:
The struct implements from_line
because it's made from line, and last_position
which just splits by )(
, takes last element, splits ˙it by ,
and takes the first element.
#[derive(Deserialize)]
struct AlLine {
qry_contig_id : u32,
ref_contig_id : u8,
ref_start : f64,
ref_end : f64,
conf : f64,
align : String,
}
impl AlLine {
fn from_line(line: &str) -> Self {
let fields: Vec<&str> = line.split('\t').collect();
let qry_contig_id: u32 = fields[0].parse().unwrap();
let ref_contig_id: u8 = fields[1].parse().unwrap();
let ref_start: f64 = fields[2].parse().unwrap();
let ref_end: f64 = fields[3].parse().unwrap();
let conf: f64 = fields[4].parse().unwrap();
let align: String = fields[5].to_string();
AlLine{qry_contig_id,ref_contig_id,ref_start,ref_end,conf,align}
}
fn last_position(&self) -> u32{
let align = &self.align;
let last_position: u32 = align[1..align.len()-1].split(")(").last().unwrap().split(",").next().unwrap().parse().unwrap();
last_position
}
}
The function implements two HashMaps
- one stores the largest label for said RefContigID
and the other stores a vector of AlLine
structs for said label
(I prefer the implementation via struct
rather than lines because I intend to use said struct
later on)
Iterating through the file, I extract the lines into AlLine
struct, find its last_position
and then, depending on the existence of said AlLine
struct's ID insert it into two hashmaps with their respective values (integer and AlLin
) or check with the previous values. If the new value is larger than previous value, I overwrite the entry for both hashmaps, and if the new value is equal to the previous value, I simply append the struct to the second hashmap:
fn process_file(file_path: &PathBuf) -> Result<Vec<AlLine>, Box<dyn Error>> {
// Two hashmaps
let mut largest_labels: HashMap<u8, u32> = HashMap::new();
let mut last_alignments: HashMap<u8, Vec<AlLine>> = HashMap::new();
let file = File::open(file_path)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
if line.starts_with('#') {
continue; // Skip comment lines
}
let alline = AlLine::from_line(&line);
let ref_contig_id = alline.ref_contig_id;
// first_label not yet used but will be
// need to create two new hashmaps
// matching logic
let last_label: u32 = alline.last_position();
match largest_labels.get(&ref_contig_id){
Some(&max_label) => {
if last_label > max_label {
// Overwrite previous
largest_labels.insert(ref_contig_id,last_label);
last_alignments.insert(ref_contig_id,vec![alline]);
} else if last_label == max_label{
last_alignments.entry(ref_contig_id).or_insert(vec![]).push(alline);
}
}
None => {
// Making a new entry
largest_labels.insert(ref_contig_id,last_label);
last_alignments.insert(ref_contig_id,vec![alline]);
}
}
}
let mut merged: Vec<AlLine> = Vec::new();
for (_, lines) in last_alignments.into_iter() {
merged.extend(lines);
}
Ok(merged)
}
(I then merge them and return said value).
Now I want to implement parallelization via rayon
crate and to be honest, I'm struggling.
I've started by using rayon
:
use rayon::ThreadPoolBuilder;
use rayon::prelude::*;
fn process_parallel(file_path: &PathBuf) -> Result<HashMap<u8, Vec<AlLine>>, Box<dyn Error>> {
// Restrict Rayon to use only 2 threads
ThreadPoolBuilder::new().num_threads(2).build_global().unwrap();
// Rest of the function remains unchanged
let mut largest_labels: HashMap<u8, u32> = HashMap::new();
let mut last_alignments: HashMap<u8, Vec<AlLine>> = HashMap::new();
let file = File::open(file_path)?;
let reader = BufReader::new(file);
// Parallelize the iteration over lines
reader
.lines()
.par_bridge() // par bridge needed instead of par_iter
.filter_map(|line| {
if let Ok(ref line) = line {
if !line.starts_with('#') {
Some(line.as_str()) // Convert to &str
} else {
None
}
} else {
None
}
})
.for_each(|line| {
let alline = AlLine::from_line(&line);
let ref_contig_id = alline.ref_contig_id;
// matching logic
let last_label: u32 = alline.last_position();
match largest_labels.get(&ref_contig_id) {
Some(&max_label) => {
if last_label > max_label {
// Overwrite previous
largest_labels.insert(ref_contig_id, last_label);
last_alignments.insert(ref_contig_id, vec![alline]);
} else if last_label == max_label {
last_alignments
.entry(ref_contig_id)
.or_insert_with(Vec::new)
.push(alline);
}
}
None => {
// Making a new entry
largest_labels.insert(ref_contig_id, last_label);
last_alignments.insert(ref_contig_id, vec![alline]);
}
}
});
Ok(last_alignments)
}
I'm filtering out comments using filter_map
, and iterating through io via par_bridge
, and simply copying the logic (sans the merging) of single threaded solution into for_each
.
However, I get two errors I've been unable to bridge over:
error[E0596]: cannot borrow `last_alignments` as mutable, as it is a captured variable in a `Fn` closure
--> src\main.rs:137:25
|
137 | last_alignments
and :
error[E0515]: cannot return value referencing local data `line.0`
113 | if let Ok(ref line) = line {
| -------- `line.0` is borrowed here
114 | if !line.starts_with('#') {
115 | Some(line.as_str()) // Convert to &str
| ^^^^^^^^^^^^^^^^^^^ returns a value referencing data owned by the current function
How should I proceed further? I've tried some ChatGPT boosted solutions via DashMap
and similar but it just broke the result further.
EDIT: I've implement what CGPT and @ByteVoyager said.
First off, I've replaced:
.filter_map(|line| {
if let Ok(ref line) = line {
if !line.starts_with('#') {
Some(line.as_str()) // Convert to &str
} else {
None
}
} else {
None
}
})
with the following:
.filter_map(Result::ok)
.filter(|line| !line.starts_with('#'))
The filter map is needed to convert Result<String, Error>
to string.
I've also replaced all instances of HashMap
with DashMap
, (dashmap = "5.1.0" in TOML file). The error I got was:
error[E0308]: mismatched types
--> src\main.rs:124:33
|
124 | if last_label > max_label {
| ---------- ^^^^^^^^^ expected `u32`, found `Ref<'_, u16, u32>`
| |
| expected because this is `u32`
|
= note: expected type `u32`
found struct `dashmap::mapref::one::Ref<'_, u16, u32>`
help: consider dereferencing the type
if last_label > *max_label {
So I've did just that. Problem is that it runs but stops producing output - (I've added println!()
in all three cases) and after a variable number of runs stops producing anything.
Have no idea how to proceed further.
Upvotes: 0
Views: 124
Reputation: 121
error[E0596]: cannot borrow `last_alignments` as mutable, as it is a captured variable in a `Fn` closure
Essentially, the issue here is that the for_each
method uses multi-threading, so the closure you pass in is not allowed to directly mutate anything in the surrounding state. Imagine if two different threads attempted to insert into the HashMap
at the same time. That could easily lead to undefined behavior.
There are a couple ways to solve this problem. Firstly, you could wrap your HashMap
in a Mutex
. Then, if two threads attempt to insert into the HashMap
at the same time, one will block until the other is completed. You could also use DashMap
as ChatGPT suggested. The reason this works is that DashMap
's insert
method takes a shared reference to self
, so it doesn't count as mutation. Switching to DashMap
should be as simple as changing HashMap::new
to DashMap::new
.
error[E0515]: cannot return value referencing local data `line.0`
113 | if let Ok(ref line) = line {
| -------- `line.0` is borrowed here
114 | if !line.starts_with('#') {
115 | Some(line.as_str()) // Convert to &str
| ^^^^^^^^^^^^^^^^^^^ returns a value referencing data owned by the current function
I don't understand why you are trying to convert from String
to &str
here. The code should work if you change the filter_map
into a regular filter
.
.filter(|line| !line.starts_with('#'))
Upvotes: 1