Timmmm
Timmmm

Reputation: 96655

Tokio mpsc channel doesn't work across tasks

I have code that sends file modification events over a tokio channel. The messages originate in a dedicated thread (I think), and end up in a tokio thread. It works fine if the received is in the main tokio task but if I move it to a spawned task then for some reason it rx.next() immediately fails. Here's the code:

use futures::{
    channel::mpsc::{channel, Receiver},
    SinkExt, StreamExt,
};
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;

fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
    let (mut tx, rx) = channel(1);

    let watcher = RecommendedWatcher::new(move |res| {
        futures::executor::block_on(async {
            tx.send(res).await.unwrap();
        })
    })?;

    Ok((watcher, rx))
}

#[tokio::main]
async fn main() {
    let path = std::env::args()
        .nth(1)
        .expect("Argument 1 needs to be a path");

    println!("watching {}", path);

//    watch_with_task(&path).await.unwrap();
    watch_without_task(&path).await;
}

fn watch_with_task(path: &str) -> tokio::task::JoinHandle<()> {
    let (mut watcher, mut rx) = async_watcher().unwrap();

    watcher
        .watch(Path::new(path), RecursiveMode::NonRecursive)
        .unwrap();

    tokio::spawn(async move {
        eprintln!("Watch task spawned");
        while let Some(res) = rx.next().await {
            match res {
                Ok(event) => eprintln!("File changed: {:?}", event),
                Err(e) => eprintln!("Watch error: {:?}", e),
            }
        }
        eprintln!("Watch task finished");
    })
}
async fn watch_without_task(path: &str) {
    let (mut watcher, mut rx) = async_watcher().unwrap();

    watcher
        .watch(Path::new(path), RecursiveMode::NonRecursive)
        .unwrap();

    while let Some(res) = rx.next().await {
        match res {
            Ok(event) => eprintln!("File changed: {:?}", event),
            Err(e) => eprintln!("Watch error: {:?}", e),
        }
    }
}

Dependencies:

[dependencies]
futures = "0.3"
notify = "5.0.0-pre.13"
tokio = { version = "1.6", features = ["full"] }

With watch_without_task I get this output - it works:

watching /path/to/file
File changed: Event { kind: Modify(Metadata(Any)), paths: ["/path/to/file"], attr:tracker: None, attr:flag: None, attr:info: None, attr:source: None }
File changed: Event { kind: Modify(Data(Content)), paths: ["/path/to/file"], attr:tracker: None, attr:flag: None, attr:info: None, attr:source: None }

With watch_with_task I get this:

watching /path/to/file
Watch task spawned
Watch task finished

It exits immediately. Why?

Upvotes: 1

Views: 847

Answers (1)

Timmmm
Timmmm

Reputation: 96655

Oooo actually I was wrong. The issue is that when using the task the watcher gets dropped immediately, whereas without the task it is kept alive by the while loop.

Upvotes: 2

Related Questions