Jones
Jones

Reputation: 23

Hyper server drops connection on returning Async::NotReady in Future

I'm trying to run a hyper server with an asynchronous response on a request using a future. When the future's poll method is called and returns Async::NotReady, the connection is just dropped ("dropping I/O source: 0"). I expected that the poll method is called multiple times until it returns Async::Ready.

The example shown returns an async io future which is doing (I guess) the same thing.

Why is the future's poll function just called once and why does hyper drop the connection after the future returns Async::NotReady?

Example Code: (hyper version is: v0.12.21)

use futures::{Async, Future, Poll};
use hyper::http::{Request, Response};
use hyper::service::service_fn;
use hyper::{Body, Server};

fn main() {
    let addr = ([127, 0, 0, 1], 3335).into();
    println!("Start request handler. (Listening on http://{})", addr);

    hyper::rt::run(
        Server::bind(&addr)
            .serve(|| service_fn(|request: Request<Body>| handle_request(request.uri().path())))
            .map_err(|e| println!("server error: {}", e)),
    );
}

type BoxedResponseFuture = Box<Future<Item = Response<Body>, Error = tokio::io::Error> + Send>;

fn handle_request(path: &str) -> BoxedResponseFuture {
    println!("Handle request {:?}", path);
    Box::new(
        ResponseFuture { ready: false }
            .and_then(|_| {
                let response = Response::new(Body::from("Success".to_string()));

                Ok(response)
            })
            .or_else(|e| {
                let response = Response::new(Body::from(format!("Error: {:?}", e)));

                Ok(response)
            }),
    )
}

struct ResponseFuture {
    ready: bool,
}

impl Future for ResponseFuture {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("Poll future");

        if self.ready {
            println!("Future ready");

            return Ok(Async::Ready(()));
        }

        println!("Future not ready");
        self.ready = true;
        Ok(Async::NotReady)
    }
}

Upvotes: 2

Views: 710

Answers (1)

Artemij Rodionov
Artemij Rodionov

Reputation: 1826

Hyper is built on top of the futures crate and uses its future model known as a "readiness" or "pull" where values are pulled out of futures on demand, and otherwise a task is notified when a value might be ready to get pulled out.

When poll returns NotReady, the current task must register for a readiness change notification, otherwise the task may never be completed. Any function that returns Async must adhere to it.

In other words, you should wait until poll can return Ready or notify the current task to indicate that it is ready to make progress and return NotReady

// notify about progress
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    println!("Poll future");

    if self.ready {
        println!("Future ready");

        return Ok(Async::Ready(()));
    }

    println!("Future not ready");
    self.ready = true;

    // The executor will poll this task next iteration
    futures::task::current().notify();
    Ok(Async::NotReady)
}

// wait until it is Ready
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    loop {
        println!("Poll future");

        if self.ready {
            println!("Future ready");
            return Ok(Async::Ready(()));
        }

        println!("Future not ready");
        self.ready = true;
    }
}

Tokio's docs 1 2 might clarify it.

Upvotes: 1

Related Questions