Reputation: 23
I'm trying to run a hyper server with an asynchronous response on a request using a future. When the future's poll
method is called and returns Async::NotReady
, the connection is just dropped ("dropping I/O source: 0"). I expected that the poll
method is called multiple times until it returns Async::Ready
.
The example shown returns an async io future which is doing (I guess) the same thing.
Why is the future's poll
function just called once and why does hyper drop the connection after the future returns Async::NotReady
?
Example Code: (hyper version is: v0.12.21)
use futures::{Async, Future, Poll};
use hyper::http::{Request, Response};
use hyper::service::service_fn;
use hyper::{Body, Server};
fn main() {
let addr = ([127, 0, 0, 1], 3335).into();
println!("Start request handler. (Listening on http://{})", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(|| service_fn(|request: Request<Body>| handle_request(request.uri().path())))
.map_err(|e| println!("server error: {}", e)),
);
}
type BoxedResponseFuture = Box<Future<Item = Response<Body>, Error = tokio::io::Error> + Send>;
fn handle_request(path: &str) -> BoxedResponseFuture {
println!("Handle request {:?}", path);
Box::new(
ResponseFuture { ready: false }
.and_then(|_| {
let response = Response::new(Body::from("Success".to_string()));
Ok(response)
})
.or_else(|e| {
let response = Response::new(Body::from(format!("Error: {:?}", e)));
Ok(response)
}),
)
}
struct ResponseFuture {
ready: bool,
}
impl Future for ResponseFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
Ok(Async::NotReady)
}
}
Upvotes: 2
Views: 710
Reputation: 1826
Hyper is built on top of the futures crate and uses its future model known as a "readiness" or "pull" where values are pulled out of futures on demand, and otherwise a task is notified when a value might be ready to get pulled out.
When poll
returns NotReady
, the current task must register for a readiness change notification, otherwise the task may never be completed. Any function that returns Async
must adhere to it.
In other words, you should wait until poll
can return Ready
or notify the current task to indicate that it is ready to make progress and return NotReady
// notify about progress
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
// The executor will poll this task next iteration
futures::task::current().notify();
Ok(Async::NotReady)
}
// wait until it is Ready
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
}
}
Tokio's docs 1 2 might clarify it.
Upvotes: 1