Reputation: 1214
I would like to implement a long running background task that can report progress to other Actor
s. I already accomplished that.
But I would also like to be able to cancel the long running background task again.
What I got so far is this:
use actix::prelude::*;
struct Worker {}
impl Actor for Worker {
type Context = SyncContext<Self>;
}
struct Manager {
worker: Addr<Worker>,
}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Supervised for Manager {}
impl SystemService for Manager {
fn service_started(&mut self, _ctx: &mut Context<Self>) {}
}
struct Work {}
#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work);
#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);
impl Handler<PerformWork> for Worker {
type Result = ();
fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
for i in 0..10000000 {
// Report progress
Manager::from_registry().do_send(ReportProgress(i));
// Do some very slow I/O.
thread::sleep(time::Duration::from_millis(1));
}
}
}
impl Handler<ReportProgress> for Manager {
type Result = ();
fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
// Do something with the progress here
}
}
The Manager
also handles a Message
that sends the PerformWork
Message
to the Worker
.
I thought of giving the ReportProgress
Message
a bool
return type that would allow the Worker
decide if it should break out of its loop. However, I cannot manage sending a Message
with a return result to the Manager
.
Using send()
instead of do_send()
returns a Future
that I cannot resolve within the SyncContext
.
Any ideas are very much appreciated.
A bit more background:
Upvotes: 1
Views: 1171
Reputation: 1214
I found a solution, but I am not convinced that it is a good one.
I added an Arc<AtomicBool>>
that is passed to the Worker
. The Manager
keeps a reference to the AtomicBool
and can modify it. The Worker
breaks out of its loop if the AtomicBool
is modified by the Manager
.
use actix::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
struct Worker {}
impl Actor for Worker {
type Context = SyncContext<Self>;
}
struct Manager {
worker: Addr<Worker>,
}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Supervised for Manager {}
impl SystemService for Manager {
fn service_started(&mut self, _ctx: &mut Context<Self>) {}
}
struct Work {}
#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work, Arc<AtomicBool>>);
#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);
impl Handler<PerformWork> for Worker {
type Result = ();
fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
for i in 0..10000000 {
// Report progress
Manager::from_registry().do_send(ReportProgress(i));
if msg.1.load(Ordering::Relaxed) {
break;
}
// Do some very slow I/O.
thread::sleep(time::Duration::from_millis(1));
}
}
}
impl Handler<ReportProgress> for Manager {
type Result = ();
fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
// Do something with the progress here
}
}
Upvotes: 1