Dominykas Mostauskis
Dominykas Mostauskis

Reputation: 8135

What might cause a difficult-to-reproduce truncation of a Hyper HTTP response?

I am experiencing a bug where my Hyper HTTP response is being truncated to a specific size (7829 bytes). Making the same request with cURL works fine.

The request queries a JSON endpoint for data. The response struct is then shuffled around a lot, because a relatively complex rate-limiting procedure is used to make a number of these requests at once. However, if only one request is made, the response is still truncated.

Before implementing rate-limiting and doing some heavy refactoring, the program made these responses properly.

I made the minimal example below, but it fails to reproduce the problem. At this point I'm not sure where to look. The codebase is moderately complicated and iteratively expanding the reproduction example is difficult, especially when I don't know what might possibly cause this.

What are some ways that Hyper's Response body might get truncated? The response body is acquired as in the handle function below.

#![feature(use_nested_groups)]
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;

use futures::{Future, Stream};
use hyper::{Body, Chunk, Client, Method, Request, Response};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use std::env;

fn main() {
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    fn handle(response: Response<Body>) -> Box<Future<Item = usize, Error = hyper::Error>> {
        Box::new(
            response
                .body()
                .concat2()
                .map(move |body: Chunk| -> usize { body.len() }),
        )
    }

    let args: Vec<String> = env::args().collect();
    let uri = &args[1];
    let req = Request::new(Method::Get, uri.parse().unwrap());

    let response_body_length = {
        let future = Box::new(client.request(req).map(handle).flatten());
        core.run(future).unwrap()
    };

    println!("response body length: {}", response_body_length);
}

Offending code:

extern crate serde;
extern crate serde_json;
use futures::{future, stream, Future, Stream};
use hyper;
use hyper::{client, Body, Chunk, Client, Headers, Method, Request, Response, header::Accept,
            header::Date as DateHeader, header::RetryAfter};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use models::Bucket;
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use std::str;

header! { (XRateLimitRemaining, "x-ratelimit-remaining") => [String] }

#[derive(Debug)]
struct Uri(pub String);

const MAX_REQ_SIZE: u32 = 500;

fn make_uri(symbol: &str, page_ix: u32) -> Uri {
    Uri(format!(
        "https://www.bitmex.com/api/v1/trade/bucketed?\
         symbol={symbol}&\
         columns={columns}&\
         partial=false&\
         reverse=true&\
         binSize={bin_size}&\
         count={count}&\
         start={start}",
        symbol = symbol,
        columns = "close,timestamp",
        bin_size = "5m",
        count = MAX_REQ_SIZE,
        start = 0 + MAX_REQ_SIZE * page_ix
    ))
}

#[derive(Debug)]
struct RateLimitInfo {
    remaining_reqs: u32,
    retry_after: Option<Duration>,
}

impl RateLimitInfo {
    fn default() -> RateLimitInfo {
        RateLimitInfo {
            remaining_reqs: 1,
            retry_after: None,
        }
    }
    fn from<T>(resp: &Response<T>) -> RateLimitInfo {
        let headers = resp.headers();
        let remaining_reqs = headers
            .get::<XRateLimitRemaining>()
            .unwrap_or_else(|| panic!("x-ratelimit-remaining not on request."))
            .parse()
            .unwrap();
        let retry_after = match headers.get::<RetryAfter>() {
            Some(RetryAfter::Delay(duration)) => Some(*duration),
            _ => None,
        };
        RateLimitInfo {
            remaining_reqs,
            retry_after,
        }
    }
}

fn resp_dated_later<'a>(a: &'a Response<Body>, b: &'a Response<Body>) -> &'a Response<Body> {
    let get_date = |resp: &Response<Body>| {
        let headers: &Headers = resp.headers();
        **headers.get::<DateHeader>().unwrap()
    };
    if get_date(&a) > get_date(&b) {
        a
    } else {
        b
    }
}

#[derive(Debug)]
struct Query {
    uri: Uri,
    response: Option<Response<Body>>,
}

impl Query {
    fn from_uri(uri: Uri) -> Query {
        Query {
            uri: uri,
            response: None,
        }
    }
}

fn query_good(q: &Query) -> bool {
    match &q.response {
        Some(response) => response.status().is_success(),
        _ => false,
    }
}

type HttpsClient = hyper::Client<HttpsConnector<client::HttpConnector>>;

type FutureQuery = Box<Future<Item = Query, Error = hyper::Error>>;

fn to_future(x: Query) -> FutureQuery {
    Box::new(future::ok(x))
}

fn exec_if_needed(client: &HttpsClient, query: Query) -> FutureQuery {
    fn exec(client: &HttpsClient, q: Query) -> FutureQuery {
        println!("exec: {:?}", q);
        let uri = q.uri;
        let req = {
            let mut req = Request::new(Method::Get, uri.0.parse().unwrap());
            req.headers_mut().set(Accept::json());
            req
        };
        Box::new(
            client
                .request(req)
                .inspect(|resp| println!("HTTP {}", resp.status()))
                .map(|resp| Query {
                    uri: uri,
                    response: Some(resp),
                }),
        )
    }
    if query_good(&query) {
        to_future(query)
    } else {
        exec(client, query)
    }
}

type BoxedFuture<T> = Box<Future<Item = T, Error = hyper::Error>>;

fn do_batch(client: &HttpsClient, queries: Vec<Query>) -> BoxedFuture<Vec<Query>> {
    println!("do_batch() {} queries", queries.len());
    let exec_if_needed = |q| exec_if_needed(client, q);
    let futures = queries.into_iter().map(exec_if_needed);
    println!("do_batch() futures {:?}", futures);
    Box::new(
        stream::futures_ordered(futures).collect(), //future::join_all(futures)
    )
}

fn take<T>(right: &mut Vec<T>, suggested_n: usize) -> Vec<T> {
    let n: usize = if right.len() < suggested_n {
        right.len()
    } else {
        suggested_n
    };
    let left = right.drain(0..n);
    return left.collect();
}

type BoxedResponses = Box<Vec<Response<Body>>>;

fn batched_throttle(uris: Vec<Uri>) -> BoxedResponses {
    println!("batched_throttle({} uris)", uris.len());
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    let mut rate_limit_info = RateLimitInfo::default();

    let mut queries_right: Vec<Query> = uris.into_iter().map(Query::from_uri).collect();

    loop {
        let mut queries_left: Vec<Query> = Vec::with_capacity(queries_right.len());

        println!("batched_throttle: starting inner loop");
        loop {
            // throttle program during testing
            thread::sleep(Duration::from_millis(800));
            println!("batched_throttle: {:?}", rate_limit_info);
            if let Some(retry_after) = rate_limit_info.retry_after {
                println!("batched_throttle: retrying after {:?}", retry_after);
                thread::sleep(retry_after)
            }
            if queries_right.is_empty() {
                break;
            }
            let mut queries_mid = {
                let ri_count = rate_limit_info.remaining_reqs;
                let iter_req_count = if ri_count == 0 { 1 } else { ri_count };
                println!("batched_throttle: iter_req_count {}", iter_req_count);
                take(&mut queries_right, iter_req_count as usize)
            };
            println!(
                "batched_throttle: \
                 queries_right.len() {}, \
                 queries_left.len() {}, \
                 queries_mid.len() {})",
                queries_right.len(),
                queries_left.len(),
                queries_mid.len()
            );
            if queries_mid.iter().all(query_good) {
                println!("batched_throttle: queries_mid.iter().all(query_good)");
                continue;
            }
            queries_mid = { core.run(do_batch(&client, queries_mid)).unwrap() };
            rate_limit_info = {
                let create_very_old_response =
                    || Response::new().with_header(DateHeader(UNIX_EPOCH.into()));
                let very_old_response = create_very_old_response();
                let last_resp = queries_mid
                    .iter()
                    .map(|q| match &q.response {
                        Some(r) => r,
                        _ => panic!("Impossible"),
                    })
                    .fold(&very_old_response, resp_dated_later);
                RateLimitInfo::from(&last_resp)
            };
            &queries_left.append(&mut queries_mid);
        }

        queries_right = queries_left;

        if queries_right.iter().all(query_good) {
            break;
        }
    }

    println!(
        "batched_throttle: finishing. queries_right.len() {}",
        queries_right.len()
    );

    Box::new(
        queries_right
            .into_iter()
            .map(|q| q.response.unwrap())
            .collect(),
    )
}

fn bucket_count_to_req_count(bucket_count: u32) -> u32 {
    let needed_req_count = (bucket_count as f32 / MAX_REQ_SIZE as f32).ceil() as u32;
    return needed_req_count;
}

type BoxedBuckets = Box<Vec<Bucket>>;

fn response_to_buckets(response: Response<Body>) -> BoxedFuture<Vec<Bucket>> {
    Box::new(response.body().concat2().map(|body: Chunk| -> Vec<Bucket> {
        println!("body.len(): {}", body.len());
        println!("JSON: {}", str::from_utf8(&body).unwrap());
        serde_json::from_slice(&body).unwrap()
    }))
}

pub fn get_n_last(symbol: &str, bucket_count: u32) -> BoxedBuckets {
    let req_count = bucket_count_to_req_count(bucket_count);
    let uris = (0..req_count)
        .map(|page_ix| make_uri(symbol, page_ix))
        .collect();

    let responses = batched_throttle(uris);

    let mut core = Core::new().unwrap();
    let boxed_buckets = {
        let futures = responses.into_iter().map(response_to_buckets);
        let future = stream::futures_ordered(futures).collect();
        let groups_of_buckets = core.run(future).unwrap();
        Box::new(
            groups_of_buckets
                .into_iter()
                .flat_map(|bs| bs.into_iter())
                .rev()
                .collect(),
        )
    };

    return boxed_buckets;
}

Upvotes: 1

Views: 704

Answers (1)

Stefan
Stefan

Reputation: 5530

You first create a Core and start lots of requests and gather the Response "results".

After you got all the Responses you start a new Core and try to start reading the data from those Responses - but the server probably closed them long ago due to write timeouts, and you only get partial data.

You shouldn't keep the server waiting; start reading the Responses as soon as possible.

Upvotes: 3

Related Questions