Sébastien Renauld
Sébastien Renauld

Reputation: 19662

How can I chain two futures on the same resource without having to define every single method combination ahead of time?

I am writing the code to bootstrap and connect to a 2G/3G network using a SIM800L modem. This modem is interfaced with a single serial channel, which I've muxed outside of this project into 4 channels (data, text interface, control interface, status messages).

In order to bootstrap this, I need to run a series of sequential commands. This sequence changes based on the output of the modem (is the SIM locked? What kind of info does the SIM need to be unlocked? What kind of APN are we getting on? What kind of network selection do we want?). I initially thought that this would be a perfect application for futures as each individual operation can be very costly in terms of time spent idling (AT+COPS, one of the command, takes up to 10s to return).

I'm on to something like this, which, while it compiles and seems to execute commands sequentially, the third operation comes out empty. My question is twofold: why do the commands run not pop up in the result of the last future, and is there a more robust way of doing something like this?

#![feature(conservative_impl_trait)]

extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::{future, Future};
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    pub fn ops(&mut self) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        println!("{:?}", self.operations);
        let ops = Arc::clone(&self.operations);
        let ops = ops.lock().unwrap();
        future::ok::<Vec<String>, io::Error>(ops.to_vec()).boxed()
    }

    pub fn run(&mut self, command: &str) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let (tx, rx) = oneshot::channel::<Vec<String>>();

        let ops = Arc::clone(&self.operations);
        let str_cmd = String::from(command);
        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            let mut ops = ops.lock().unwrap();
            ops.push(str_cmd.clone());
            println!("Pushing op: {}", str_cmd.clone());
            tx.send(vec!["OK".to_string()])
        });

        rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test"))
            .boxed()
    }
}

pub struct Channels {
    inner_object: Arc<Mutex<Channel>>,
}

impl Channels {
    pub fn one(&self, cmd: &str) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let v = Arc::clone(&self.inner_object);
        let mut v = v.lock().unwrap();
        v.run(&cmd)
    }

    pub fn ops(&self) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let v = Arc::clone(&self.inner_object);
        let mut v = v.lock().unwrap();
        v.ops()
    }

    pub fn run_command(&self) -> Box<Future<Item = (), Error = io::Error>> {
        let a = self.one("AT+CMEE=2");
        let b = self.one("AT+CREG=0");
        let c = self.ops();
        Box::new(a.and_then(|result_1| {
            assert_eq!(result_1, vec![String::from("OK")]);
            b.and_then(|result_2| {
                assert_eq!(result_2, vec![String::from("OK")]);
                c.map(move |ops| {
                    assert_eq!(
                        ops.as_slice(),
                        ["AT+CMEE=2".to_string(), "AT+CREG=0".to_string()]
                    );
                })
            })
        }))
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        inner_object: Arc::new(Mutex::new(Channel {
            operations: Arc::new(Mutex::new(vec![])),
        })),
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

playground

Upvotes: 1

Views: 195

Answers (1)

Shepmaster
Shepmaster

Reputation: 430711

why do the commands run not pop up in the result of the last future

Because you haven't sequenced the operations to occur in that way:

let a = self.one("AT+CMEE=2");
let b = self.one("AT+CREG=0");
let c = self.ops();

This immediately builds:

  • a, b — promises that sleep a while before they respond
  • c — a promise that gets the ops in the vector

At the point in time that c is created, the sleeps have yet to terminate, so there have been no operations performed, so the vector will be empty.

Future::and_then is intended to be used to define sequential operations. This is complicated in your case as you want to use self in the body of the and_then closure. You can clone the Arc<Channel> and use that instead.

You'll note that I've made a number of simplifications:

  • Returning a String instead of Vec<String>
  • Removing unused mut qualifiers and a Mutex
  • Returning the operations Vec directly.
extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::Future;
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    fn ops(&self) -> Vec<String> {
        self.operations.lock().unwrap().clone()
    }

    fn command(&self, command: &str) -> Box<Future<Item = String, Error = io::Error>> {
        let (tx, rx) = oneshot::channel();

        let ops = Arc::clone(&self.operations);
        let str_cmd = String::from(command);

        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            println!("Pushing op: {}", str_cmd);
            ops.lock().unwrap().push(str_cmd);

            tx.send("OK".to_string())
        });

        Box::new(rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test")))
    }
}

struct Channels {
    data: Arc<Channel>,
}

impl Channels {
    fn run_command(&self) -> Box<Future<Item = (), Error = io::Error>> {
        let d2 = Arc::clone(&self.data);
        let d3 = Arc::clone(&self.data);

        Box::new(
            self.data
                .command("AT+CMEE=2")
                .and_then(move |cmee_answer| {
                    assert_eq!(cmee_answer, "OK"); // This should be checked in `command` and be a specific Error
                    d2.command("AT+CREG=0")
                })
                .map(move |creg_answer| {
                    assert_eq!(creg_answer, "OK"); // This should be checked in `command` and be a specific Error
                    let ops = d3.ops();
                    assert_eq!(ops, ["AT+CMEE=2", "AT+CREG=0"])
                }),
        )
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        data: Arc::new(Channel {
            operations: Arc::new(Mutex::new(vec![])),
        }),
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

However, this isn't the type of code I usually see with futures. Instead of taking &self, many futures take self. Let's see how that would look:

extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::Future;
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

#[derive(Clone)]
pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    fn ops(&self) -> Arc<Mutex<Vec<String>>> {
        Arc::clone(&self.operations)
    }

    fn command(self, command: &str) -> Box<Future<Item = (Self, String), Error = io::Error>> {
        let (tx, rx) = oneshot::channel();
        let str_cmd = String::from(command);

        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            println!("Pushing op: {}", str_cmd);
            self.operations.lock().unwrap().push(str_cmd);

            tx.send((self, "OK".to_string()))
        });

        Box::new(rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test")))
    }
}

struct Channels {
    data: Channel,
}

impl Channels {
    fn run_command(self) -> Box<Future<Item = (), Error = io::Error>> {
        Box::new(
            self.data
                .clone()
                .command("AT+CMEE=2")
                .and_then(|(channel, cmee_answer)| {
                    assert_eq!(cmee_answer, "OK");
                    channel.command("AT+CREG=0")
                })
                .map(|(channel, creg_answer)| {
                    assert_eq!(creg_answer, "OK");
                    let ops = channel.ops();
                    let ops = ops.lock().unwrap();
                    assert_eq!(*ops, ["AT+CMEE=2", "AT+CREG=0"]);
                }),
        )
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        data: Channel {
            operations: Arc::new(Mutex::new(vec![])),
        },
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

Upvotes: 1

Related Questions