Reputation: 9349
Given a basic setup of a WebSocket server with Actix, how can I launch a daemon inside my message handler?
I've extended the example starter code linked above to call daemon(false, true)
using the fork crate.
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use fork::{daemon, Fork};
/// Define HTTP actor
struct MyWs;
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
println!("text message received");
if let Ok(Fork::Child) = daemon(false, true) {
println!("from daemon: this print but then the websocket crashes!");
};
ctx.text(text)
},
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
.bind("127.0.0.1:8080")?
.run()
.await
}
The above code starts the server but when I send it a message, I receive a Panic in Arbiter thread
.
text message received
from daemon: this print but then the websocket crashes!
thread 'actix-rt:worker:0' panicked at 'failed to park', /Users/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/runtime/basic_scheduler.rs:158:56
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.
Upvotes: 3
Views: 1194
Reputation: 15683
The issue with your application is that the actix-web runtime (i.e. Tokio) is multi-threaded. This is a problem because the fork()
call (used internaly by daemon()
) only replicates the thread that called fork()
.
Even if your parent process has N threads, your child process will have only 1. If your parent process has any mutexes locked by those threads, their state will be replicated in the child process, but as those threads do not exist there, they will remain locked for forever.
If you have an Rc
/Arc
it will never de-allocate its memory, because it will never be dropped, thus its internal count will never reach zero. The same applies for any pointers and shared state.
Or said more simply - your forked child will end up in undefined state.
This is best explained in Calling fork() in a Multithreaded Environment:
The fork( ) system call creates an exact duplicate of the address space from which it is called, resulting in two address spaces executing the same code. Problems can occur if the forking address space has multiple threads executing at the time of the fork( ). When multithreading is a result of library invocation, threads are not necessarily aware of each other's presence, purpose, actions, and so on. Suppose that one of the other threads (any thread other than the one doing the fork( )) has the job of deducting money from your checking account. Clearly, you do not want this to happen twice as a result of some other thread's decision to call fork( ).
Because of these types of problems, which in general are problems of threads modifying persistent state, POSIX defined the behavior of fork( ) in the presence of threads to propagate only the forking thread. This solves the problem of improper changes being made to persistent state. However, it causes other problems, as discussed in the next paragraph.
In the POSIX model, only the forking thread is propagated. All the other threads are eliminated without any form of notice; no cancels are sent and no handlers are run. However, all the other portions of the address space are cloned, including all the mutex state. If the other thread has a mutex locked, the mutex will be locked in the child process, but the lock owner will not exist to unlock it. Therefore, the resource protected by the lock will be permanently unavailable.
Here you can find a more reputable source with more details
"how can I launch a daemon inside my message handler?"
I assume you want to implement the classical unix "fork() on accept()" model. In that case you are out of luck, because servers such as actix-web, and async/await in general are not designed with that in mind. Even if you have a single-threaded async/await server, then:
When a child is forked it inherits all file descriptors from the parent. So it's common after a fork, the child to close its listening socket in order to avoid a resource leak - but there is no way to do that on any of the async/await based servers, not because it's impossible to do, but because it's not implemented.
And even more important reason to do that is to prevent the child process
from accepting new connections - because even if you run a single threaded
server, it's still capable of processing many tasks concurrently - i.e.
when your handler calls .await
on something, the acceptor would be free to
accept a new connection (by stealing it from the socket's queue) and start processing it.
Your parent server may have already spawned a lot of tasks and those would be replicated in each forked child, thus executing the very same thing multiple times, independently in each process
And well... there is no way to prevent any of that on any of the async/await based servers I'm familiar with. You would need a custom server that:
In other words - async/await and "fork() on accept()" are two different and incompatible models for processing tasks concurrently.
A possible solution would be to have a non-async acceptor daemon that only accepts connections and forks itself. Then spawns a web-server in the child then feeding it the accepted socket. But although possible, none of the servers currently have support for that.
Upvotes: 7
Reputation: 166
As described in the other answer, the async runtime you're relying on may completely break if you touch it in the child process. Touching anything can completely break assumptions the actix or tokio devs made. Wacky stuff will happen if you so much as return from the function.
See this response by one of the key authors of tokio to someone doing something similar (calling fork()
in the context of a threadpool with hyper):
Threads + fork is bad news... you can fork if you immediately exec and do not allocate memory or perform any other operation that may have been corrupted by the fork.
Going back to your question:
The objective is for my websocket to respond to messages and be able to launch isolated long-running processes that launch successfully and do not exit when the websocket exits.
I don't think you want to manually fork()
at all. Utility functions provided by actix/tokio should integrate well with their runtimes. You may:
actix_web::block
actix::AsyncContext::spawn
. You would ideally want to use e.g. tokio::process::Command
rather than the std
version to avoid blocking in an async context.Command::new()
and later Command::spawn()
, I'm pretty sure you can just call it directly. There's no need to fork; it does that internally.Upvotes: 2