Reputation: 715
I am trying to have a struct that starts an event loop, listens for TCP connections and calls a callback for each connection.
(The callback will be handed some prepossessed data from the socket. In my example below I just hand it the IP address of the connection but in my real code I will parse the contents that I receive with serde
into a struct and pass that into the callback. I hope that doesn't invalidate the following "not working example").
My Cargo.toml
:
[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"
and main.rs
:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}
impl Test {
pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test{ printer: None };
x.start();
}
I have tried several things starting from this code (which is adopted directly from the Tokio example).
If I use the code like posted above I get:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
for the line 24 (tokio::run(server)
).
If I add the Send
trait on the Fn
in the printer field XOR if I remove the move
in the closure in the for_each
call I get another error instead:
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
which points me to the closure that apparently cannot outlive the start
method where it is defined but tokio::run
seems to have conflicting requirements for it.
Do you know if I am addressing the callback pattern in totally the wrong way or if there is just some minor error in my code?
Upvotes: 1
Views: 1680
Reputation: 822
First things first:
Compiler will translate Box<Fn(std::net::SocketAddr) + Sync>
to Box<Fn(std::net::SocketAddr) + Sync + 'static>
unless the lifetime is explicitly specified.
Let's have a look at the errors:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
This is self-explanatory. You are trying to move &mut T
to another thread, but cannot, because T
here is not Send
. To send &mut T
to another thread T
too needs to be of type Send
.
Here is the minimal code that will give the same error:
use std::fmt::Debug;
fn func<T> (i:&'static mut T) where T: Debug {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
If I make T
above to also be of type Send
, the error goes away.
But in your case when you add the Send
trait, it gives lifetime error. Why?
&mut self
has some lifetime greater than the function start()
set by the caller, but there's no guarantee that its 'static
. You move this reference into the closure which is passed to the thread and can potentially outlive the scope it is closing over, leading to a dangling reference.
Here's a minimal version, that would give the same error.
use std::fmt::Debug;
fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
Sync
is not really required here as it is &mut T
. Changing &mut T
to &T
(retaining Sync
), will also result into the same error. The onus here is on references and not mutability. So you see, there is some lifetime 'a
and it is moved into a closure (given to a thread), which means the closure now contains a reference (disjoint from the main context). So now, what is 'a
and how long will it live from the closure's perspective that is invoked from another thread? Not inferable! As a result, the compiler complains saying cannot infer an appropriate lifetime due to conflicting requirements
.
If we tweak the code a bit to;
impl Test {
pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
it will compile fine. There's a guarantee there that self
has a 'static
lifetime. Please note that in the match
statement we need to pass &self.printer
, as you cannot move out of a borrowed context.
However, this expects Test
to be declared static and that too a mutable one, which is generally not the best way, if you have other options.
Another way is; if it's ok for you to pass Test
by value to start()
and then further move it into for_each()
, the code would look like this:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}
impl Test {
pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => {
callback(address);
}
None => {
println!("{}", address);
}
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test { printer: None };
x.start();
}
Upvotes: 1