Reputation: 11561
I want to calculate an embarassingly parallel problem with Rayon, then reduce it to a long stream of maybe a hundred of unique values. It looks like I can't build the hashmap on the fly because mutexes cause deadlocks in Rayon. Because of that, I want to calculate the number of unique values for the iterator I built. Here's a minimal example:
extern crate itertools;
use crate::itertools::Itertools;
extern crate rayon;
use rayon::prelude::*;
fn main() {
let counts_par = (1..=1000000)
.into_par_iter()
.map(|i| i % 10);
let counts = counts_par.counts();
println!("{:?}", counts);
}
And the matching Cargo.toml:
[package]
name = "elo"
version = "0.1.0"
edition = "2021"
[dependencies]
itertools = "0.10.0"
I'm getting the following error:
error[E0599]: the method `counts` exists for struct `Map<Iter<{integer}>, {[email protected]:21:14}>`, but its trait bounds were not satisfied
--> src/main.rs:22:29
|
22 | let counts = counts_par.counts();
| ^^^^^^
|
::: /home/d33tah/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-1.10.0/src/iter/map.rs:15:1
|
15 | pub struct Map<I: ParallelIterator, F> {
| -------------------------------------- doesn't satisfy `_: Iterator` or `_: Itertools`
|
= note: the following trait bounds were not satisfied:
`rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
which is required by `rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`
`&rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
which is required by `&rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`
`&mut rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
which is required by `&mut rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`
How can I solve the problem, assuming that I'm working with a dataset that's too big to fit in the memory?
Upvotes: 0
Views: 91
Reputation: 27237
You can simply copy and adapt itertools
counts
implementation for ParallelIterator
s:
use std::collections::HashMap;
use std::sync::Mutex;
use std::hash::Hash;
trait CountsExt: ParallelIterator {
fn counts(self) -> HashMap<Self::Item, usize>
where
Self: Sized,
Self::Item: Eq + Hash + Sync;
// +++++++
}
impl<T: ParallelIterator> CountsExt for T {
fn counts(self) -> HashMap<Self::Item, usize>
where
Self: Sized,
Self::Item: Eq + Hash + Sync,
// +++++++
{
let counts = Mutex::new(HashMap::new());
// ^-mut +++++++++++ +
self.for_each(|item| *counts.lock().unwrap().entry(item).or_default() += 1);
// ++++++++++++++++
counts.into_inner().unwrap()
// ++++++++++++++++++++++
}
}
Upvotes: 2