unsafe_where_true
unsafe_where_true

Reputation: 6320

Start a forever thread with a expired objects loop, removing those objects, in rust

I am writing a service, and I want to run a loop forever that checks for some expired objects and removes them if too old.

pub struct Obj {
    expired: NaiveDateTime,
}

pub struct Maintainer {
    objs: HashMap<id, Obj>,
}

pub trait Miller {
    fn new() -> Self;
}

impl Miller for Maintainer {
    fn new() -> Self {
        let i = Self {
            obj: Hashmap::new(),
        };
        i.start_exp_observer();
        i
    }
}

impl Maintainer {
    fn start_exp_observer(&self) {
        let observer = thread::spawn(move || loop {
            thread.sleep(sleep_time);
            self.objs
                .retain(|_, o| o.expired.gt(Utc::now().naive_utc()));
        });
        // does this even work here
        observer.join().unwrap();
    }
}

In rust, this does not work, because I am using an immutable self in start_exp_observer, which is the one created by new().

I tried fn start_exp_observer(&mut self) but that puts me in trouble, as the Self created in new is not mutable. And if I define that Self as mutable, then I get trouble with the trait.

And if feels like the more I try, the more trouble I get (if I'd clone Self before returning the object for example, then I guess I am not looking at the same objects in the thread).

How can this be done?

Upvotes: 0

Views: 74

Answers (1)

啊鹿Dizzyi
啊鹿Dizzyi

Reputation: 1050

You need to make sure the self type can pass between thread, and limit only one access at the same time i.e. Mutex and Arc.

use chrono::*;
use std::collections::HashMap;

use std::sync::{Arc, Mutex};

pub struct Obj {
    expired: NaiveDateTime,
}

#[derive(Clone)]
pub struct Maintainer {
    objs: Arc<Mutex<HashMap<u64, Obj>>>,
    //    ^   ^     ^
    //    |   |     |
    //    |   |     L The actual data 
    //    |   L To make sure only one access at a time
    //    L to make the data be able to pass between thread

}

pub trait Miller {
    fn new() -> Self;
}

impl Miller for Maintainer {
    fn new() -> Self {
        let objs = Default::default();
        let i = Self { objs };
        i.start_exp_observer();
        i
    }
}

impl Maintainer {
    fn start_exp_observer(&self) {
        // Clone a self, the clone will be sent to another thread
        let maintainer = self.clone();
        // Spawn a new thread, the `move` will move the `maintainer` variable to another thread
        // while keeping `self` in the current thread
        std::thread::spawn(move || loop {
            std::thread::sleep(std::time::Duration::from_millis(1000));
            maintainer
                .objs 
                .lock()
                // ^^^^ we need to lock the mutex, to make sure we do not have another thread
                //      i.e. the main thread, accessing it at the same time
                .unwrap()
                .retain(|_, o| o.expired.gt(&Utc::now().naive_utc()));
        });
        // Do not `join()` the handle
        // join mean await for it to finish 
        // because it is a infinite loop, the main thread will no progress
    }
}

Upvotes: 1

Related Questions