Jonas Wolf
Jonas Wolf

Reputation: 1214

How to send a message to another actor from actix SyncContext?

I would like to implement a long running background task that can report progress to other Actors. I already accomplished that. But I would also like to be able to cancel the long running background task again.

What I got so far is this:

use actix::prelude::*;

struct Worker {}

impl Actor for Worker {
    type Context = SyncContext<Self>;
}

struct Manager {
    worker: Addr<Worker>,
}

impl Actor for Manager {
    type Context = Context<Self>;
}

impl Supervised for Manager {}

impl SystemService for Manager {
    fn service_started(&mut self, _ctx: &mut Context<Self>) {}
}

struct Work {}

#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work);

#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);

impl Handler<PerformWork> for Worker {
    type Result = ();

    fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
        for i in 0..10000000 {
            // Report progress
            Manager::from_registry().do_send(ReportProgress(i));
            // Do some very slow I/O.
            thread::sleep(time::Duration::from_millis(1));
        }
    }
}

impl Handler<ReportProgress> for Manager {
    type Result = ();

    fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
        // Do something with the progress here
    }
}

The Manager also handles a Message that sends the PerformWork Message to the Worker.

I thought of giving the ReportProgress Message a bool return type that would allow the Worker decide if it should break out of its loop. However, I cannot manage sending a Message with a return result to the Manager. Using send() instead of do_send() returns a Future that I cannot resolve within the SyncContext.

Any ideas are very much appreciated.

A bit more background:

Upvotes: 1

Views: 1171

Answers (1)

Jonas Wolf
Jonas Wolf

Reputation: 1214

I found a solution, but I am not convinced that it is a good one.

I added an Arc<AtomicBool>> that is passed to the Worker. The Manager keeps a reference to the AtomicBool and can modify it. The Worker breaks out of its loop if the AtomicBool is modified by the Manager.

use actix::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};

struct Worker {}

impl Actor for Worker {
    type Context = SyncContext<Self>;
}

struct Manager {
    worker: Addr<Worker>,
}

impl Actor for Manager {
    type Context = Context<Self>;
}

impl Supervised for Manager {}

impl SystemService for Manager {
    fn service_started(&mut self, _ctx: &mut Context<Self>) {}
}

struct Work {}

#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work, Arc<AtomicBool>>);

#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);

impl Handler<PerformWork> for Worker {
    type Result = ();

    fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
        for i in 0..10000000 {
            // Report progress
            Manager::from_registry().do_send(ReportProgress(i));
            if msg.1.load(Ordering::Relaxed) {
                break;
            }
            // Do some very slow I/O.
            thread::sleep(time::Duration::from_millis(1));
        }
    }
}

impl Handler<ReportProgress> for Manager {
    type Result = ();

    fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
        // Do something with the progress here
    }
}

Upvotes: 1

Related Questions