RNHTTR
RNHTTR

Reputation: 2525

Terminating threads when a condition is met

I want to efficiently search the keys of two HashMaps for a single value, and terminate both threads once the value has been found. I'm currently doing this using two separate message channels (i.e. two transmitters and two receivers), but I'm not sure this is the correct approach. Given that the "mpsc" component of mpsc::channel stands for "multiple producer, single consumer", it feels wrong to have multiple producers and multiple consumers. So, is there a better way to concurrently search two arrays?

My code Also available in the playground:

use std::collections::HashMap;
use std::array::IntoIter;
use std::thread;
use std::time::Duration;
use std::iter::FromIterator;
use std::sync::mpsc;

fn main() {
    let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10)]));
    let m2 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10), (11, 12), (13, 14), (15, 16), (17,18), (19, 20)]));

    let (tx1, rx1) = mpsc::channel::<u8>();
    let (tx2, rx2) = mpsc::channel::<u8>();

    let handle1 = thread::spawn(move || {
        let iter_keys1 = m1.keys();
        for k in iter_keys1 {
            if k.clone() == 11u8 {
                tx2.send(*k);
                break
            } else {
                println!("Key from handle1: {}", k);
            }
            thread::sleep(Duration::from_millis(1));
        }
        for received in rx1 {
            let into: u8 = received;
            if into == 11u8 {
                println!("handle2 sent a message to receiver1: {}", into);
                break
            }
        }
        m1
    });

    let handle2 = thread::spawn(move || {
        let iter_keys2 = m2.keys();
        for k in iter_keys2 {
            if k.clone() == 11u8 {
                tx1.send(*k);
                break
            } else {
                println!("Key from handle2: {}", k);
            }
            thread::sleep(Duration::from_millis(1));
        }
        for received in rx2 {
            let into: u8 = received;
            if into == 11u8 {
                println!("handle1 sent a message to receiver2: {}", into);
                break
            }
        }
        m2
    });
    handle1.join().unwrap();
    handle2.join().unwrap();
}

Somewhat related question: Is there a practical reason to use sleep, or does that just make it easier to see the results of concurrent processing on small samples? When I comment out the thread::sleep(Duration::from_millis(1)); lines, it seems like the threads are being processed sequentially:

Key from handle1: 9
Key from handle1: 5
Key from handle1: 3
Key from handle1: 1
Key from handle1: 7
Key from handle2: 1
handle2 sent a message to receiver1: 11
Clarification:

I'm trying to search for a key that could exist in two different hash maps. In this example, I'm searching for 11 in both sets of keys, and want to terminate both threads when I've found it in either one of the sets of keys.

Upvotes: 0

Views: 121

Answers (1)

Svetlin Zarev
Svetlin Zarev

Reputation: 15673

I'm trying to search for a key that could exist in two different hash maps. In this example, I'm searching for 11 in both sets of keys, and want to terminate both threads when I've found it in either one of the sets of keys.

In that case there is no reason to use mpsc to communicate the stop condition. You can use a simple atomic bool:

use std::array::IntoIter;
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]));
    let m2 = HashMap::<_, _>::from_iter(IntoIter::new([
        (1, 2),
        (3, 4),
        (5, 6),
        (7, 8),
        (9, 10),
        (11, 12),
        (13, 14),
        (15, 16),
        (17, 18),
        (19, 20),
    ]));

    let stop_signal = Arc::new(AtomicBool::new(false));

    let stop = stop_signal.clone();
    let h1 = thread::spawn(move || {
        let keys = m1.keys();
        for &k in keys {
            if stop.load(Ordering::Relaxed) {
                println!("Another thread found it!");
                break;
            }

            if k == 11u8 {
                stop.store(true, Ordering::Relaxed);
                // do something with the found key
                println!("Found by thread 1");
                break;
            }
        }
        m1
    });

    let stop = stop_signal.clone();
    let h2 = thread::spawn(move || {
        let keys = m2.keys();
        for &k in keys {
            if stop.load(Ordering::Relaxed) {
                println!("Another thread found it!");
                break;
            }

            if k == 11u8 {
                stop.store(true, Ordering::Relaxed);
                // do something with the found key
                println!("Found by thread 2");
                break;
            }
        }
        m2
    });

    h1.join().unwrap();
    h2.join().unwrap();
}

Your original code had several issues:

  • One of the threads would have been kept alive even when it finished with its map until it received a message.
  • Even if one of the thread found the key, the other would have still continued to search for it
  • There is no point in doing thread::sleep() in the loop. It does not achieve anything except for slowing down the app

Upvotes: 4

Related Questions