d33tah
d33tah

Reputation: 11561

Calculate unique values of par_iter or filter_map?

I want to calculate an embarassingly parallel problem with Rayon, then reduce it to a long stream of maybe a hundred of unique values. It looks like I can't build the hashmap on the fly because mutexes cause deadlocks in Rayon. Because of that, I want to calculate the number of unique values for the iterator I built. Here's a minimal example:

extern crate itertools;
use crate::itertools::Itertools;

extern crate rayon;
use rayon::prelude::*;

fn main() {
    let counts_par = (1..=1000000)
        .into_par_iter()
        .map(|i| i % 10);
    let counts = counts_par.counts();
    println!("{:?}", counts);
}

And the matching Cargo.toml:

[package]
name = "elo"
version = "0.1.0"
edition = "2021"

[dependencies]
itertools = "0.10.0"

I'm getting the following error:

error[E0599]: the method `counts` exists for struct `Map<Iter<{integer}>, {[email protected]:21:14}>`, but its trait bounds were not satisfied
  --> src/main.rs:22:29
   |
22 |     let counts = counts_par.counts();
   |                             ^^^^^^
   |
  ::: /home/d33tah/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-1.10.0/src/iter/map.rs:15:1
   |
15 | pub struct Map<I: ParallelIterator, F> {
   | -------------------------------------- doesn't satisfy `_: Iterator` or `_: Itertools`
   |
   = note: the following trait bounds were not satisfied:
           `rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
           which is required by `rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`
           `&rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
           which is required by `&rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`
           `&mut rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Iterator`
           which is required by `&mut rayon::iter::Map<rayon::range_inclusive::Iter<{integer}>, {closure@src/main.rs:21:14: 21:17}>: Itertools`

How can I solve the problem, assuming that I'm working with a dataset that's too big to fit in the memory?

Upvotes: 0

Views: 91

Answers (1)

cafce25
cafce25

Reputation: 27237

You can simply copy and adapt itertools counts implementation for ParallelIterators:

use std::collections::HashMap;
use std::sync::Mutex;
use std::hash::Hash;
trait CountsExt: ParallelIterator {
    fn counts(self) -> HashMap<Self::Item, usize>
    where
        Self: Sized,
        Self::Item: Eq + Hash + Sync;
//                           +++++++
}

impl<T: ParallelIterator> CountsExt for T {
    fn counts(self) -> HashMap<Self::Item, usize>
    where
        Self: Sized,
        Self::Item: Eq + Hash + Sync,
//                           +++++++
    {
        let counts = Mutex::new(HashMap::new());
        //  ^-mut    +++++++++++              +
        self.for_each(|item| *counts.lock().unwrap().entry(item).or_default() += 1);
        //                          ++++++++++++++++
        counts.into_inner().unwrap()
        //    ++++++++++++++++++++++
    }
}

Playground

Upvotes: 2

Related Questions