ibse
ibse

Reputation: 583

How to run an asynchronous task from a non-main thread in Tokio?

use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}

When compiling I get a warning:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled

And when executing I get an error:

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9

Upvotes: 22

Views: 26586

Answers (2)

Shepmaster
Shepmaster

Reputation: 430711

The key piece is that you need to get a Tokio Handle. This is a reference to a Runtime and it allows you to spawn asynchronous tasks from outside of the runtime.

When using #[tokio::main], the simplest way to get a Handle is via Handle::current before spawning another thread then give the handle to each thread that might want to start an asynchronous task:

use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}

You could also create a global, mutable singleton of a Mutex<Option<Handle>>, initialize it to None, then set it to Some early in your tokio::main function. Then, you can grab that global variable, unwrap it, and clone the Handle when you need it:

use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);
*HANDLE.lock().unwrap() = Some(Handle::current());
let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();

See also:

Upvotes: 26

evanxg852000
evanxg852000

Reputation: 289

I have a job processing app that exposes a web API to add jobs and process them but the API request should not wait for the job to finish (it could take a while). I use Server-Sent Events to broadcast the job result. This means the main API server is executing inside main with #[tokio::main], but where should I be running the job executor? In the job executor, I will have plenty of waiting: things like downloading. They will interfere with the web API server. The crucial question is how do I even start both executions in parallel?

In this scenario, you need to create a separate thread with thread::spawn inside which you will create a Tokio executor. The error you get is that inside your second thread, there is no Tokio executor (runtime). You need to create one manually and tell it to run your tasks. The easier way is to use the Runtime API:

use tokio::runtime::Runtime; // 0.2.23

// Create the runtime
let rt = Runtime::new().unwrap();

// Spawn a future onto the runtime
rt.spawn(async {
    println!("now running on a worker thread");
});

In your main thread, an executor is already available with the use of #[tokio::main]. Prior to the addition of this attribute, the runtime was created manually.

If you want to stick with the async/await philosophy, you can use join:

use tokio; // 0.2.23

#[tokio::main]
async fn main() {
    let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
}

This is why most answers are questioning your approach. Although very rare, I believe there are scenarios where you want an async runtime to be on another thread while also having the benefit to manually configure the runtime.

Upvotes: 13

Related Questions