ruipacheco
ruipacheco

Reputation: 16442

Why does Tokio return the error "Cannot drop a runtime in a context where blocking is not allowed"?

I have a Tokio client that talks to a remote server and is supposed to keep the connection alive permanently. I've implemented the initial authentication handshake and found that when my test terminates, I get an odd panic:

---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21

I'm at an absolute loss as to what might be causing this so I don't know what other bits of code to add for context.

Here's my minimal reproducible example (playground):

use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;

#[derive(PartialEq, Debug)]
pub struct Configuration {
    /// Database username.
    username: String,
    /// Database password.
    password: String,
    /// Database name.
    db_name: String,
    /// IP address for the remove server.
    address: IpAddr,
    /// Remote server port.
    port: u16,
    /// Number of connections to open.
    connections: u16,
}

impl Configuration {
    pub fn new(
        username: &str,
        password: &str,
        db_name: &str,
        address: &str,
        port: u16,
        connections: u16,
    ) -> Result<Configuration, Box<dyn std::error::Error>> {
        let address = address.to_string().parse()?;
        let configuration = Configuration {
            username: username.to_string(),
            password: password.to_string(),
            db_name: db_name.to_string(),
            address,
            port,
            connections,
        };
        Ok(configuration)
    }

    pub fn socket(&self) -> SocketAddr {
        SocketAddr::new(self.address, self.port)
    }
}

#[derive(Debug)]
pub struct Session {
    configuration: Configuration,
    runtime: RefCell<runtime::Runtime>,
}

impl Session {
    fn new(config: Configuration) -> Result<Session, io::Error> {
        let runtime = runtime::Builder::new_multi_thread()
            .worker_threads(6)
            .enable_all()
            .build()?;
        let session = Session {
            configuration: config,
            runtime: RefCell::new(runtime),
        };
        Ok(session)
    }

    fn configuration(&self) -> &Configuration {
        &self.configuration
    }
}

#[derive(Debug)]
pub(crate) struct Connection<'a> {
    /// Socket uses to read and write from.
    session: &'a Session,
    /// Connection to the remote server.
    stream: TcpStream,
    /// Flag that indicates whether the connection is live.
    live: bool,
}

impl<'a> Connection<'a> {
    async fn new(session: &Session) -> Result<Connection<'_>, Box<dyn std::error::Error>> {
        let mut stream = TcpStream::connect(session.configuration().socket()).await?;
        let conn = Connection {
            session,
            stream,
            live: true,
        };

        Ok(conn)
    }

    fn live(&self) -> bool {
        self.live
    }
}

#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
    let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
    let session = Session::new(config).unwrap();
    let conn = Connection::new(&session).await?;
    assert!(conn.live());
    Ok(())
}

fn main() {
    println!("{}", 65u8 as char)
}

Upvotes: 16

Views: 17300

Answers (1)

Shepmaster
Shepmaster

Reputation: 431289

As the error message states:

This happens when a runtime is dropped from within an asynchronous context

You have created nested runtimes:

  1. From tokio::test
  2. From runtime::Builder::new_multi_thread

The second runtime is owned by Session which is dropped at the end of the asynchronous test. You can observe this by skipping the destructor using mem::forget:

#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
    let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
    let session = Session::new(config).unwrap();
    // Note that the assert was removed! 
    std::mem::forget(session);

    Ok(())
}

Don't spawn nested runtimes and don't drop one runtime from within another.

See also:

Upvotes: 17

Related Questions