fmvin
fmvin

Reputation: 699

How to terminate or suspend a Rust thread from another thread?

Editor's note — this example was created before Rust 1.0 and the specific types have changed or been removed since then. The general question and concept remains valid.

I have spawned a thread with an infinite loop and timer inside.

thread::spawn(|| {
    let mut timer = Timer::new().unwrap();
    let periodic = timer.periodic(Duration::milliseconds(200));
    loop {
        periodic.recv();

        // Do my work here
    }
});

After a time based on some conditions, I need to terminate this thread from another part of my program. In other words, I want to exit from the infinite loop. How can I do this correctly? Additionally, how could I to suspend this thread and resume it later?

I tried to use a global unsafe flag to break the loop, but I think this solution does not look nice.

Upvotes: 46

Views: 51962

Answers (4)

Chayim Friedman
Chayim Friedman

Reputation: 71525

For terminating threads, channels are way more expensive than a simple Arc<AtomicBool>. You can encapsulate it in nice API too:

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

#[derive(Clone)]
pub struct CancellationToken {
    cancelled: Arc<AtomicBool>,
}

impl CancellationToken {
    #[inline]
    pub fn should_cancel(&self) -> bool {
        self.cancelled.load(Ordering::Acquire)
    }
}

#[derive(Clone)]
pub struct Canceller {
    cancelled: Arc<AtomicBool>,
}

impl Canceller {
    #[inline]
    pub fn cancel(&self) {
        self.cancelled.store(true, Ordering::Release);
    }
}

#[inline]
pub fn cancellation_token() -> (Canceller, CancellationToken) {
    let cancelled = Arc::new(AtomicBool::new(false));
    (
        Canceller {
            cancelled: Arc::clone(&cancelled),
        },
        CancellationToken { cancelled },
    )
}

Then, pass CancellationToken to your thread, and it should periodically check should_cancel().

Upvotes: 6

cloudsurfin
cloudsurfin

Reputation: 2587

Having been back to this question several times myself, here's what I think addresses OP's intent and others' best practice of getting the thread to stop itself. Building on the accepted answer, Crossbeam is a nice upgrade to mpsc in allowing message endpoints to be cloned and moved. It also has a convenient tick function. The real point here is it has try_recv() which is non-blocking.

I'm not sure how universally useful it'd be to put a message checker in the middle of an operational loop like this. I haven't found that Actix (or previously Akka) could really stop a thread without--as stated above--getting the thread to do it itself. So this is what I'm using for now (wide open to correction here, still learning myself).

// Cargo.toml:
// [dependencies]
// crossbeam-channel = "0.4.4"

use crossbeam_channel::{Sender, Receiver, unbounded, tick};
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx):(Sender<String>, Receiver<String>) = unbounded();
    let rx2 = rx.clone();

    // crossbeam allows clone and move of receiver
    std::thread::spawn(move || {
        // OP:
        // let mut timer = Timer::new().unwrap();
        // let periodic = timer.periodic(Duration::milliseconds(200));

        let ticker: Receiver<Instant> = tick(std::time::Duration::from_millis(500));

        loop {
            // OP:
            // periodic.recv();
            crossbeam_channel::select! {
                recv(ticker) -> _ => {

                    // OP: Do my work here
                    println!("Hello, work.");

                    // Comms Check: keep doing work?
                    // try_recv is non-blocking
                    // rx, the single consumer is clone-able in crossbeam
                    let try_result = rx2.try_recv();
                    match try_result {
                        Err(_e) => {},
                        Ok(msg) => {
                            match msg.as_str() {
                                "END_THE_WORLD" => {
                                    println!("Ending the world.");
                                    break;
                                },
                                _ => {},
                            }
                        },
                        _ => {}
                    }
                }
            }
        }
    });

    // let work continue for 10 seconds then tell that thread to end.
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("Goodbye, world.");
    tx.send("END_THE_WORLD".to_string());
}

Using strings as a message device is a tad cringeworthy--to me. Could do the other suspend and restart stuff there in an enum.

Upvotes: 3

Rajeev Ranjan
Rajeev Ranjan

Reputation: 4106

The ideal solution would be a Condvar. You can use wait_timeout in the std::sync module, as pointed out by @Vladimir Matveev.

This is the example from the documentation:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

thread::spawn(move|| {
    let &(ref lock, ref cvar) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    // We notify the condvar that the value has changed.
    cvar.notify_one();
});

// wait for the thread to start up
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
// as long as the value inside the `Mutex` is false, we wait
loop {
    let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap();
    // 10 milliseconds have passed, or maybe the value changed!
    started = result.0;
    if *started == true {
        // We received the notification and the value has been updated, we can leave.
        break
    }
}

Upvotes: 10

Vladimir Matveev
Vladimir Matveev

Reputation: 128111

For both terminating and suspending a thread you can use channels.

Terminated externally

On each iteration of a worker loop, we check if someone notified us through a channel. If yes or if the other end of the channel has gone out of scope we break the loop.

use std::io::{self, BufRead};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

fn main() {
    println!("Press enter to terminate the child thread");
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || loop {
        println!("Working...");
        thread::sleep(Duration::from_millis(500));
        match rx.try_recv() {
            Ok(_) | Err(TryRecvError::Disconnected) => {
                println!("Terminating.");
                break;
            }
            Err(TryRecvError::Empty) => {}
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    let _ = stdin.lock().read_line(&mut line);

    let _ = tx.send(());
}

Suspending and resuming

We use recv() which suspends the thread until something arrives on the channel. In order to resume the thread, you need to send something through the channel; the unit value () in this case. If the transmitting end of the channel is dropped, recv() will return Err(()) - we use this to exit the loop.

use std::io::{self, BufRead};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    println!("Press enter to wake up the child thread");
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || loop {
        println!("Suspending...");
        match rx.recv() {
            Ok(_) => {
                println!("Working...");
                thread::sleep(Duration::from_millis(500));
            }
            Err(_) => {
                println!("Terminating.");
                break;
            }
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    for _ in 0..4 {
        let _ = stdin.lock().read_line(&mut line);
        let _ = tx.send(());
    }
}

Other tools

Channels are the easiest and the most natural (IMO) way to do these tasks, but not the most efficient one. There are other concurrency primitives which you can find in the std::sync module. They belong to a lower level than channels but can be more efficient in particular tasks.

Upvotes: 47

Related Questions