Adrien Gras
Adrien Gras

Reputation: 47

Lifetime issues with rocket fairings, tokio-scheduler and crons

I'm currently building an API engine on top of Rocket, and as modern apps do, I wanted to include an automated scheduler to run async tasks (aka crons) while the Rocket API is running.

I decided to use Rocket fairings to enable the said scheduler built around tokio-schedule on the "liftoff" event.

The fact is, I set up all the required parts (logging into database, structs and traits), but I get a strange error while compiling regarding lifetimes of my fairing.

Here is a walk through of my code :

-> this is my "command" module, containing all structural parts to build and move commands (aka crons) with my application.

/// Synthetize a command execution result.
pub enum CommandResult {
    SUCCESS,
    ERROR(String),
    SKIPPED(String),
}

/// Trait to define structs as runnable async crons with tokio_scheduler
#[async_trait]
pub trait Command: Send + Sync {
    /// returns the current command name
    fn get_command_name(&self) -> String;

    /// returns the current command argument payload
    fn get_command_args(&self) -> Option<HashMap<String, String>>;

    /// returns the "cron_middleware"
    fn get_cron_middleware(&self) -> CronLogMiddleware<CronLogRepository>;

    /// real body for the command execution, must be overriden in impls.
    async fn do_run(&self) -> Result<CommandResult>;

    /// starts the command process by validating command lock, and registering an open cron log into database.
    async fn begin(&self) -> Result<CronLog> {
        // ...
    }

    /// ends the command process by releasing command lock, and registering the result of the command to an opened cron log into database.
    async fn end(&self, cron_log: &CronLog, result: CommandResult) -> Result<()> {
        // ...
    }

    /// hidden runner of commands, uses begin, end and do_run, and will be used by runner.
    async fn run(&self) -> Result<()> {
        // ...
    }

    /// generates a unique key for this command name + args, for locks purposes
    fn generate_unicity_key(&self) -> String {
        // ...
    }

    /// converts command args as a string payload
    #[allow(clippy::or_fun_call)]
    fn get_command_args_as_string(&self) -> String {
        // ...
    }
}

/// struct to move a command + its cron schedule into scheduler.
pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Box<T>,
    pub schedule: String,
}

Then, for testing purposes, I created a test command struct like this :

/// a testing purpose command
pub struct TestCommand {
    pub name: String,
    pub args: Option<HashMap<String, String>>,
    pub cron_log_middleware: CronLogMiddleware<CronLogRepository>,
}

#[async_trait]
impl Command for TestCommand {
    // accessors (get_... functions)

    async fn do_run(&self) -> Result<CommandResult> {
        debug!("executed !");

        Ok(CommandResult::SUCCESS)
    }
}

The rocket builder looks like this :

    let mut sched = CronScheduler::default();

    sched.add_cron(CommandHandle {
        command: Box::new(TestCommand {
            name: "app:test".to_string(),
            args: None,
            cron_log_middleware: cron_log_middleware.clone(),
        }),
        schedule: "*/1 * * * *".to_string(),
    });

    // then I add sched to rocket with .manage()

And the fairing looks like this :


/// a rocket fairing enabling async tasks (eg crons) while rocket is launching
#[derive(Default)]
pub struct CronScheduler {
    crons: Vec<CommandHandle<dyn Command>>,
}

impl CronScheduler {
    /// adds a cron (eg CommandHandle with a given command) to run with the scheduler.
    pub fn add_cron(&mut self, cron: CommandHandle<dyn Command>) {
        self.crons.push(cron);
    }
}

#[rocket::async_trait]
impl Fairing for CronScheduler {
    //...
                        v -- error is here
    async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
        let sched = SchedulerBuilder::build().await;

        for handle in self.crons.iter() {
            let job = Job::new_cron_job_async(handle.schedule.as_str(), |_uid, _lock| {
                Box::pin(async move {
                    handle.command.run().await;
                })
            })
            .unwrap();

            sched.add(job).await.unwrap();
        }

        sched.start().await.unwrap();
    }
}

Aaaand I get this error :

error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
  --> src/core/fairings/cron_scheduler.rs:34:26
   |
34 |       async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
   |                            ^^^^ this data with lifetime `'life0`...
...
39 | /                 Box::pin(async move {
40 | |                     handle.command.run().await;
41 | |                 })
   | |__________________- ...is used and required to live as long as `'static` here
   |
note: `'static` lifetime requirement introduced by the return type
  --> src/core/fairings/cron_scheduler.rs:34:5
   |
34 |       async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
   |       ^^^^^ requirement introduced by this return type
...
39 | /                 Box::pin(async move {
40 | |                     handle.command.run().await;
41 | |                 })
   | |__________________- because of this returned expression

And my cargo toml shortened:

[dependencies]
rocket = {version = "0.5.0-rc.2", features = ["json"]}
// ...
tokio-cron-scheduler = {version = "0.8.1", features = ["signal"]}
// ...

I tried different solutions, and remarked that this is the Command that is causing issues, as if I replace the content of the "Box::pin(...)" with something like a println!, nothing goes wrong.

I don't know if it's a conflict between async-trait and rocket async traits, or else, but I cannot figure it out.

Edit 1: shortened alot of code 'cause the ticket was way too long.

Edit 2: solution found thanks to the validated answer ; here is the final code patched if it can help anyone.

FTR, I did have to impl Clone myself (not using macros) and used the code of the pinned answer as a reference.

// command 

pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Arc<T>,
    pub schedule: String,
}

impl<T> Clone for CommandHandle<T>
where
    T: Command + ?Sized + Send + Sync,
{
    fn clone(&self) -> Self {
        Self {
            command: self.command.clone(),
            schedule: self.schedule.clone(),
        }
    }
}

// fairing 

    async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
        let sched = SchedulerBuilder::build().await;

        for handle in self.crons.iter() {
            let schedule = handle.schedule.clone();

            let handle = handle.clone();

            let job = Job::new_cron_job_async(schedule.as_str(), move |_uid, _lock| {
                let handle = handle.clone();

                Box::pin(async move {
                    handle.command.run().await.unwrap();
                })
            })
            .unwrap();

            sched.add(job).await.unwrap();
        }

        sched.start().await.unwrap();
    }

Upvotes: 2

Views: 669

Answers (1)

kmdreko
kmdreko

Reputation: 60132

Job::new_cron_job_async requires that your closure is 'static, but it is not since handle is a reference to self.crons.

Taking a quick look at your structure, if you use Arc instead of Box in CommandHandle, then it is easily cloned and thus can give a 'static handle to the cron job:

pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Arc<T>, // <------------
    pub schedule: String,
}

impl Clone for CommandHandle ...
for handle in self.crons.iter() {
    let handle = handle.clone(); // <------------               vvvv
    let job = Job::new_cron_job_async(handle.schedule.as_str(), move |_uid, _lock| {
        let handle = handle.clone(); // <------------
        Box::pin(async move {
            handle.command.run().await;
        })
    })
    .unwrap();

    sched.add(job).await.unwrap();
}

Its hard to verify since your posted code is incomplete, but I believe you need both clones above since the function needs ownership to be 'static, but also must be FnMut in order to be called multiple times, so the handle can't be moved into the async block directly.

Upvotes: 2

Related Questions