Blue7
Blue7

Reputation: 2044

rust: Awaiting future causes block when using a simple future from the futures_signals crate

Unfortunately the tutorial and documentation for futures_signals doesn't show how to make the code behind the future actually execute.

I have tried using:

use futures_signals::signal::Mutable;
use futures_signals::signal::SignalExt; //for Iterator trait (gives for_each)
use futures::executor::LocalPool;

fn main(){
    let my_state = Mutable::new(5);
    
    let future = my_state.signal().for_each(|value| {
        // This code is run for the current value of my_state,
        // and also every time my_state changes
        println!("{}", value);
        async {}
    });
    println!("Awaiting...");
    let mut pool = LocalPool::new();
    pool.run_until(future);
    println!("Done!");
}

This prints the "5", but then blocks forever. It does not print "Done!".

Why does it not see that the future has completed?

Upvotes: 0

Views: 531

Answers (1)

Blue7
Blue7

Reputation: 2044

To expand upon jmbs comment, the future will only complete when the Mutable that returned it is dropped. This is because the Mutable can change state until it is dropped, and the callback will be called every time Mutable changes. The future is only complete when no more callbacks can be called.

See below for a fully working example:

use futures_signals::signal::Mutable;
use futures_signals::signal::SignalExt; //for Iterator trait (gives for_each)
use futures::executor::LocalPool;
use std::thread;
use futures::join;

fn main(){

    //create my_state, and a clone that will be passed to the thread
    let my_state = Mutable::new(5);
    let my_state_shared = my_state.clone();

    //increment my_state by 1 in a loop, until it reaches 10
    thread::spawn(move || {
        loop {
            my_state_shared.set(my_state_shared.get() + 1);
            thread::sleep(std::time::Duration::from_secs(1));
            if my_state_shared.get() == 10 {
                break;
            }
        }
    });
    
    //create observers 
    let obs_a_future = my_state.signal().for_each(|value| {
        println!("Observer A {}", value);
        async {}
    });
    let obs_b_future = my_state.signal().for_each(|value| {
        println!("Observer B {}", value);
        async {}
    });

    drop(my_state);

    //run the app
    let mut pool = LocalPool::new();
    pool.run_until( async {
        join!(obs_a_future, obs_b_future);
    });

    println!("!");

}

Upvotes: 1

Related Questions