Brian Yeh
Brian Yeh

Reputation: 3267

Rust warp server issue when passing a handler with a generic

main.rs:

use async_trait::async_trait;
use tokio::runtime::Runtime;
use warp::Filter;

fn main() {
    //create http server

    let state = CState {
        inner: 2
    };

    Runtime::new().unwrap().block_on(async move {
        run_server(state.clone()).await;
    });
}

trait TaskType {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self;
}

#[async_trait]
trait State<T: TaskType>: Clone {
    async fn add_task(&self, task: T) -> u64;
}

#[derive(Clone)]
struct CState {
    inner: u64,
}

#[derive(Clone)]
struct CTask {
    inner: u64,
}

impl TaskType for CTask {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self {
        CTask{
            inner: 2
        }
    }
}

#[async_trait]
impl<T: TaskType> State<T> for CState {
    async fn add_task(&self, task: T) -> u64 {
        self.inner
    }
}


async fn run_server<T: TaskType, U: State<T>>(state: U) {
    let warp_state = warp::any().map(move || {
        state.clone()
    });
    async fn post_new_task_handler<T: TaskType, U: State<T>>(task_type_str: String, time: u64, state: U) -> Result<impl warp::Reply, warp::Rejection> {
        let task = T::create_task(None, task_type_str.as_str(), time);
        let id = state.add_task(task).await;
        Ok(warp::reply::json(&id))
    }
    let post_new_task_route = warp::post()
        .and(warp::path!(String / u64))
        .and(warp::path::end())
        .and(warp_state.clone())
        .and_then(post_new_task_handler::<T, U>);
    let router = post_new_task_route;
    warp::serve(router).run(([127, 0, 0, 1], 3030)).await;
}

Cargo.toml:

[package]
name = "minimal_warp"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3"
async-trait = "0.1.68"
serde = "*"
serde_json = "*"
serde_derive = "*"

For the above code I get an error from the rust compiler which seems to indicate that I'm missing some trait bounds for the generic. However which bound I need to implement is not clear. If I write a non-generic version of the code it works. What's going on with warp here? What's the proper way to fix it and how would I diagnose such issues in the future?

To be clear I think it's not the route handlers that have an issue but more the closure where I pass in the state.

Also note I have an exact version of this code working with a concrete type instead of traits, so I know it's a problem with the traits.

The exact error message is shown below:

error[E0599]: the method `and_then` exists for struct `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>`, but its trait bounds were not satisfied
  --> src/main.rs:65:10
   |
65 |         .and_then(post_new_task_handler::<T, U>);
   |          ^^^^^^^^ method cannot be called on `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>` due to unsatisfied trait bounds
   |
  ::: /home/brian/.cargo/registry/src/github.com-1ecc6299db9ec823/warp-0.3.5/src/filter/and.rs:13:1
   |
13 | pub struct And<T, U> {
   | --------------------
   | |
   | doesn't satisfy `_: warp::Filter`
   | doesn't satisfy `_: warp::filter::FilterBase`
   |
   = note: the following trait bounds were not satisfied:
           `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
           which is required by `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`
           `&warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
           which is required by `&warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`
           `&mut warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
           which is required by `&mut warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`

Edit: This was changed to make the code a minimal running example.

Upvotes: 0

Views: 197

Answers (1)

Caesar
Caesar

Reputation: 8544

Sprinkling some + Send + Sync + 'static seems to do the trick…

use async_trait::async_trait;
use tokio::runtime::Runtime;
use warp::Filter;

fn main() {
    //create http server

    let state = CState { inner: 2 };

    Runtime::new().unwrap().block_on(async move {
        run_server::<CTask, CState>(state.clone()).await;
    });
}

trait TaskType {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self;
}

#[async_trait]
trait State<T: TaskType>: Clone {
    async fn add_task(&self, task: T) -> u64;
}

#[derive(Clone)]
struct CState {
    inner: u64,
}

#[derive(Clone)]
struct CTask {
    inner: u64,
}

impl TaskType for CTask {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self {
        CTask { inner: 2 }
    }
}

#[async_trait]
impl<T: TaskType + Send + 'static> State<T> for CState {
    async fn add_task(&self, task: T) -> u64 {
        self.inner
    }
}

async fn run_server<T: TaskType + Send + 'static, U: State<T> + Send + Sync + 'static>(state: U) {
    let warp_state = warp::any().map(move || state.clone());
    async fn post_new_task_handler<T: TaskType, U: State<T>>(
        task_type_str: String,
        time: u64,
        state: U,
    ) -> Result<impl warp::Reply, warp::Rejection> {
        let task = T::create_task(None, task_type_str.as_str(), time);
        let id = state.add_task(task).await;
        Ok(warp::reply::json(&id))
    }
    let post_new_task_route = warp::post()
        .and(warp::path!(String / u64))
        .and(warp::path::end())
        .and(warp_state.clone())
        .and_then(post_new_task_handler::<T, U>);
    let router = post_new_task_route;
    warp::serve(router).run(([127, 0, 0, 1], 3030)).await;
}

I can't explain why this works, the solution was found purely by trial and error:

  • I did get a few error messages from your #[async_trait] impl State for CState related to Send and some lifetime 'async_trait (so nothing to do with warp at all). Following the compiler error messages' suggestions there just led to more error messages. So the first "leap" of intuition was to add T: Send + 'static. The Send was suggested by rustc, but the 'static is just based on "If you have trouble with lifetimes, first try '_, then try adding a parameter, then try 'static." If that doesn't work, start thinking.
  • From there on, U: Send + Sync + 'static was just complete guessing out of the blue, the warp-related error message is impossible to parse for humans. I did once remove the U parameter from post_new_task_handler, and then changed it to some benign type (i32) to make sure that U is really the problem. But then it was of the "Hmm. We just had problems with traits and futures and Send. Lemme just try…"
    The background knowledge here is that async fn-generated futures are state machines that save all parameters and variables at each .await (and at the start of the function). Oversimplifying, you can just think of fn post_new_task_handler as an
    enum PostTaskHandlerFuture<U, T> {
        Start { task_type_str: String, time: u64, state: U },
        AwaitAtLine2 { poll: /* type for U::add_task's future */ },
        Return { /* warp::reply::… */ },
        Finished
    }
    
    This state machine gets executed on tokio (mediated through warp's magic - the nice error messages are lost there), and for tokio's work stealing scheduler to be able to distribute work like executing PostTaskHandlerFuture over threads, PostTaskHandlerFuture needs to be Send. But for PostTaskHandlerFuture to be Send, U needs to be Send. (I have no idea why Sync is necessary.) So this kind of "make some generic parameter Send/Sync" is a common thing when writing generic async code, hence I thought of trying it.

Upvotes: 1

Related Questions