Happy Machine
Happy Machine

Reputation: 1163

Sharing state between threads with notify-rs

i'm new to rust.

I'm trying to write a file_sensor that will start a counter after a file is created. The plan is that after an amount of time, if a second file is not received the sensor will exit with a zero exit code.

I could write the code to continue that work but i feel the code below illustrates the problem (i have also missed for example the post function referred to)

I have been struggling with this problem for several hours, i've tried Arc and mutex's and even global variables.

The Timer implementation is Ticktock-rs

I need to be able to either get heartbeat in the match body for EventKind::Create(CreateKind::Folder) or file_count in the loop

The code i've attached here runs but file_count is always zero in the loop.

use std::env;
use std::path::Path;
use std::{thread, time};
use std::process::ExitCode;
use ticktock::Timer;
use notify::{ 
    Watcher, 
    RecommendedWatcher, 
    RecursiveMode, 
    Result, 
    event::{EventKind, CreateKind, ModifyKind, Event}
};


fn main() -> Result<()> {
    let now = time::Instant::now();
    let mut heartbeat = Timer::apply(
        |_, count| {
            *count += 1;
            *count
        },
        0,
    )
    .every(time::Duration::from_millis(500))
    .start(now);
    let mut file_count = 0;
    let args = Args::parse();
    let REQUEST_SENSOR_PATH = env::var("REQUEST_SENSOR_PATH").expect("$REQUEST_SENSOR_PATH} is not set");
    let mut watcher = notify::recommended_watcher(move|res: Result<Event>| {
        match res {
           Ok(event) => {
            match event.kind {
                EventKind::Create(CreateKind::File) => {   
                    file_count += 1;
                    // do something with file
                }
                _ => { /* something else changed */ }
            }
            println!("{:?}", event);

           },
           Err(e) => {
            println!("watch error: {:?}", e);
            ExitCode::from(101);
           },
        }
    })?;

    watcher.watch(Path::new(&REQUEST_SENSOR_PATH), RecursiveMode::Recursive)?;

    loop {
        let now = time::Instant::now();
        if let Some(n) = heartbeat.update(now){
            println!("Heartbeat: {}, fileCount: {}", n, file_count);
            if n > 10 {
                heartbeat.set_value(0);
                // This function will reset timer when a file arrives
            }
        }
    }
    Ok(())
}

Upvotes: 1

Views: 529

Answers (1)

Finomnis
Finomnis

Reputation: 22838

Your compiler warnings show you the problem:

warning: unused variable: `file_count`
  --> src/main.rs:31:25
   |
31 |                         file_count += 1;
   |                         ^^^^^^^^^^
   |
   = note: `#[warn(unused_variables)]` on by default
   = help: did you mean to capture by reference instead?

The problem here is that you use file_count inside of a move || closure. file_count is an i32, which is Copy. Using it in a move || closure actually creates a copy of it, which does no longer update the original variable if you assign to it.

Either way, it's impossible to modify a variable in main() from an event handler. Event handlers require 'static lifetime if they reference things, because Rust cannot guarantee that the event handler lives shorter than main.

One solution for this problem is to use reference counters and interior mutability. In this case, I will use Arc for reference counters and AtomicI32 for interior mutability. Note that notify::recommended_watcher requires thread safety, otherwise instead of an Arc<AtomicI32> we could have used an Rc<Cell<i32>>, which is the same thing but only for single-threaded environments, with a little less overhead.

use notify::{
    event::{CreateKind, Event, EventKind},
    RecursiveMode, Result, Watcher,
};
use std::time;
use std::{env, sync::atomic::Ordering};
use std::{path::Path, sync::Arc};
use std::{process::ExitCode, sync::atomic::AtomicI32};
use ticktock::Timer;

fn main() -> Result<()> {
    let now = time::Instant::now();
    let mut heartbeat = Timer::apply(
        |_, count| {
            *count += 1;
            *count
        },
        0,
    )
    .every(time::Duration::from_millis(500))
    .start(now);
    let file_count = Arc::new(AtomicI32::new(0));

    let REQUEST_SENSOR_PATH =
        env::var("REQUEST_SENSOR_PATH").expect("$REQUEST_SENSOR_PATH} is not set");
    let mut watcher = notify::recommended_watcher({
        let file_count = Arc::clone(&file_count);
        move |res: Result<Event>| {
            match res {
                Ok(event) => {
                    match event.kind {
                        EventKind::Create(CreateKind::File) => {
                            file_count.fetch_add(1, Ordering::AcqRel);
                            // do something with file
                        }
                        _ => { /* something else changed */ }
                    }
                    println!("{:?}", event);
                }
                Err(e) => {
                    println!("watch error: {:?}", e);
                    ExitCode::from(101);
                }
            }
        }
    })?;

    watcher.watch(Path::new(&REQUEST_SENSOR_PATH), RecursiveMode::Recursive)?;

    loop {
        let now = time::Instant::now();
        if let Some(n) = heartbeat.update(now) {
            println!(
                "Heartbeat: {}, fileCount: {}",
                n,
                file_count.load(Ordering::Acquire)
            );
            if n > 10 {
                heartbeat.set_value(0);
                // This function will reset timer when a file arrives
            }
        }
    }
}

Also, note that the ExitCode::from(101); gives you a warning. It does not actually exit the program, it only creates an exit code variable and then discards it again. You probably intended to write std::process::exit(101);. Although I would discourage it, because it does not properly clean up (does not call any Drop implementations). I'd use panic here, instead. This is the exact usecase panic is meant for.

Upvotes: 2

Related Questions