Bonanov
Bonanov

Reputation: 115

How to deal with tokio::spawn closure required to be 'static and &self?

I'm having trouble understanding how to write concurrent async code encapsulated in one single structure.

I'm not sure how to explain the problem exactly, so i'll try to do it with an example.

Let's say I have a UdpServer struct. This struct has multiple methods related to its behavior (e.g, handle_datagram, deserialize_datagram, etc)
If I want to make the code concurrent I will spawn tokio task, which requires closure provided to it to be static, which means that I can't call &self from within this task as long as &self is not static, which means that i can't call self.serialize_datagram().

I understand the problem (there is no guarantee the struct will outlive the thread), but can't see a proper way of solving it. I know it's possible to just move the function out of impl, but this doesn't look like a good solution to me.
Also, even if we assume for a moment that i could take &self as static, this code still doesn't look right to me for some reason (Not Rusty enough, i guess).
Another "solution" is to take self: Arc<Self> instead of &self, but this feels even worse.

So I'm assuming there is some pattern I'm not aware of. Can someone explain to me how should i refactor the whole thing?

Example code:

struct UdpServer {}
impl UdpServer {
    pub async fn run(&self) {
        let socket = UdpSocket::bind(self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            // I spawn tokio task to enable concurrency
            tokio::spawn(async move {
                // But i can't use &self in here because it's not static.
                let datagram = self.deserialize_datagram(buf).await;
                self.handle_datagram(()).await;
            });
        }
    }

    pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, datagram: Datagram) {
        unimplemented!()
    }
}

Upvotes: 10

Views: 11074

Answers (2)

Maksim Ryndin
Maksim Ryndin

Reputation: 51

You're absolutely right. At tokio tutorial this solution is mentioned:

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

Upvotes: 1

user4815162342
user4815162342

Reputation: 155156

Currently the only way to do it is to make self last arbitrarily long through the use of Arc. Since run() is a method on UdpServer, it requires the change to Arc<Self>, which you considered but rejected because it felt worse. Still, that's the way to do it:

pub async fn run(self: Arc<Self>) {
    let socket = UdpSocket::bind(&self.addr).await.unwrap();
    loop {
        let mut buf: &mut [u8] = &mut [];
        let (_, _) = socket.recv_from(&mut buf).await.unwrap();

        tokio::spawn({
            let me = Arc::clone(&self);
            async move {
                let datagram = me.deserialize_datagram(buf).await;
                me.handle_datagram(datagram).await;
            }
        });
    }
}

Playground

Interestingly, the smol async runtime might actually provide what you're looking for, because its executor carries a lifetime. That lifetime is associated with values from the caller's environment, and the futures spawned on the executor may refer to it. For example, this compiles:

use futures_lite::future;
use smol::{Executor, net::UdpSocket};

struct Datagram;

struct UdpServer {
    addr: String,
}

impl UdpServer {
    pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
        let socket = UdpSocket::bind(&self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            ex.spawn({
                async move {
                    let datagram = self.deserialize_datagram(buf).await;
                    self.handle_datagram(datagram).await;
                }
            }).detach();
        }
    }

    pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, _datagram: Datagram) {
        unimplemented!()
    }
}

fn main() {
    let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
    let ex = Executor::new();
    future::block_on(server.run(&ex));
}

Upvotes: 6

Related Questions