Reputation: 47
I'm currently building an API engine on top of Rocket, and as modern apps do, I wanted to include an automated scheduler to run async tasks (aka crons) while the Rocket API is running.
I decided to use Rocket fairings to enable the said scheduler built around tokio-schedule on the "liftoff" event.
The fact is, I set up all the required parts (logging into database, structs and traits), but I get a strange error while compiling regarding lifetimes of my fairing.
Here is a walk through of my code :
-> this is my "command" module, containing all structural parts to build and move commands (aka crons) with my application.
/// Synthetize a command execution result.
pub enum CommandResult {
SUCCESS,
ERROR(String),
SKIPPED(String),
}
/// Trait to define structs as runnable async crons with tokio_scheduler
#[async_trait]
pub trait Command: Send + Sync {
/// returns the current command name
fn get_command_name(&self) -> String;
/// returns the current command argument payload
fn get_command_args(&self) -> Option<HashMap<String, String>>;
/// returns the "cron_middleware"
fn get_cron_middleware(&self) -> CronLogMiddleware<CronLogRepository>;
/// real body for the command execution, must be overriden in impls.
async fn do_run(&self) -> Result<CommandResult>;
/// starts the command process by validating command lock, and registering an open cron log into database.
async fn begin(&self) -> Result<CronLog> {
// ...
}
/// ends the command process by releasing command lock, and registering the result of the command to an opened cron log into database.
async fn end(&self, cron_log: &CronLog, result: CommandResult) -> Result<()> {
// ...
}
/// hidden runner of commands, uses begin, end and do_run, and will be used by runner.
async fn run(&self) -> Result<()> {
// ...
}
/// generates a unique key for this command name + args, for locks purposes
fn generate_unicity_key(&self) -> String {
// ...
}
/// converts command args as a string payload
#[allow(clippy::or_fun_call)]
fn get_command_args_as_string(&self) -> String {
// ...
}
}
/// struct to move a command + its cron schedule into scheduler.
pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
pub command: Box<T>,
pub schedule: String,
}
Then, for testing purposes, I created a test command struct like this :
/// a testing purpose command
pub struct TestCommand {
pub name: String,
pub args: Option<HashMap<String, String>>,
pub cron_log_middleware: CronLogMiddleware<CronLogRepository>,
}
#[async_trait]
impl Command for TestCommand {
// accessors (get_... functions)
async fn do_run(&self) -> Result<CommandResult> {
debug!("executed !");
Ok(CommandResult::SUCCESS)
}
}
The rocket builder looks like this :
let mut sched = CronScheduler::default();
sched.add_cron(CommandHandle {
command: Box::new(TestCommand {
name: "app:test".to_string(),
args: None,
cron_log_middleware: cron_log_middleware.clone(),
}),
schedule: "*/1 * * * *".to_string(),
});
// then I add sched to rocket with .manage()
And the fairing looks like this :
/// a rocket fairing enabling async tasks (eg crons) while rocket is launching
#[derive(Default)]
pub struct CronScheduler {
crons: Vec<CommandHandle<dyn Command>>,
}
impl CronScheduler {
/// adds a cron (eg CommandHandle with a given command) to run with the scheduler.
pub fn add_cron(&mut self, cron: CommandHandle<dyn Command>) {
self.crons.push(cron);
}
}
#[rocket::async_trait]
impl Fairing for CronScheduler {
//...
v -- error is here
async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
let sched = SchedulerBuilder::build().await;
for handle in self.crons.iter() {
let job = Job::new_cron_job_async(handle.schedule.as_str(), |_uid, _lock| {
Box::pin(async move {
handle.command.run().await;
})
})
.unwrap();
sched.add(job).await.unwrap();
}
sched.start().await.unwrap();
}
}
Aaaand I get this error :
error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
--> src/core/fairings/cron_scheduler.rs:34:26
|
34 | async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
| ^^^^ this data with lifetime `'life0`...
...
39 | / Box::pin(async move {
40 | | handle.command.run().await;
41 | | })
| |__________________- ...is used and required to live as long as `'static` here
|
note: `'static` lifetime requirement introduced by the return type
--> src/core/fairings/cron_scheduler.rs:34:5
|
34 | async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
| ^^^^^ requirement introduced by this return type
...
39 | / Box::pin(async move {
40 | | handle.command.run().await;
41 | | })
| |__________________- because of this returned expression
And my cargo toml shortened:
[dependencies]
rocket = {version = "0.5.0-rc.2", features = ["json"]}
// ...
tokio-cron-scheduler = {version = "0.8.1", features = ["signal"]}
// ...
I tried different solutions, and remarked that this is the Command that is causing issues, as if I replace the content of the "Box::pin(...)" with something like a println!, nothing goes wrong.
I don't know if it's a conflict between async-trait and rocket async traits, or else, but I cannot figure it out.
Edit 1: shortened alot of code 'cause the ticket was way too long.
Edit 2: solution found thanks to the validated answer ; here is the final code patched if it can help anyone.
FTR, I did have to impl Clone myself (not using macros) and used the code of the pinned answer as a reference.
// command
pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
pub command: Arc<T>,
pub schedule: String,
}
impl<T> Clone for CommandHandle<T>
where
T: Command + ?Sized + Send + Sync,
{
fn clone(&self) -> Self {
Self {
command: self.command.clone(),
schedule: self.schedule.clone(),
}
}
}
// fairing
async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
let sched = SchedulerBuilder::build().await;
for handle in self.crons.iter() {
let schedule = handle.schedule.clone();
let handle = handle.clone();
let job = Job::new_cron_job_async(schedule.as_str(), move |_uid, _lock| {
let handle = handle.clone();
Box::pin(async move {
handle.command.run().await.unwrap();
})
})
.unwrap();
sched.add(job).await.unwrap();
}
sched.start().await.unwrap();
}
Upvotes: 2
Views: 669
Reputation: 60132
Job::new_cron_job_async
requires that your closure is 'static
, but it is not since handle
is a reference to self.crons
.
Taking a quick look at your structure, if you use Arc
instead of Box
in CommandHandle
, then it is easily cloned and thus can give a 'static
handle to the cron job:
pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
pub command: Arc<T>, // <------------
pub schedule: String,
}
impl Clone for CommandHandle ...
for handle in self.crons.iter() {
let handle = handle.clone(); // <------------ vvvv
let job = Job::new_cron_job_async(handle.schedule.as_str(), move |_uid, _lock| {
let handle = handle.clone(); // <------------
Box::pin(async move {
handle.command.run().await;
})
})
.unwrap();
sched.add(job).await.unwrap();
}
Its hard to verify since your posted code is incomplete, but I believe you need both clones above since the function needs ownership to be 'static
, but also must be FnMut
in order to be called multiple times, so the handle can't be moved into the async
block directly.
Upvotes: 2