1

Hyper HTTP 応答が特定のサイズ (7829 バイト) に切り捨てられるというバグが発生しています。cURL で同じリクエストを行うと、問題なく動作します。

要求は、JSON エンドポイントにデータを照会します。応答構造体はその後、多数の要求を一度に行うために比較的複雑なレート制限手順が使用されるため、何度もシャッフルされます。ただし、リクエストが 1 つしかない場合でも、レスポンスは切り捨てられます。

レート制限を実装して重いリファクタリングを行う前に、プログラムはこれらの応答を適切に行いました。

以下の最小限の例を作成しましたが、問題を再現できません。この時点で、どこを見ればよいかわかりません。コードベースは適度に複雑であり、再現例を繰り返し拡張することは困難です。特に、何が原因である可能性があるのか​​ わからない場合はなおさらです。

Hyper の Response 本文が切り詰められる可能性がある方法にはどのようなものがありますか? レスポンスボディはhandle以下の関数で取得します。

#![feature(use_nested_groups)]
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;

use futures::{Future, Stream};
use hyper::{Body, Chunk, Client, Method, Request, Response};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use std::env;

fn main() {
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    fn handle(response: Response<Body>) -> Box<Future<Item = usize, Error = hyper::Error>> {
        Box::new(
            response
                .body()
                .concat2()
                .map(move |body: Chunk| -> usize { body.len() }),
        )
    }

    let args: Vec<String> = env::args().collect();
    let uri = &args[1];
    let req = Request::new(Method::Get, uri.parse().unwrap());

    let response_body_length = {
        let future = Box::new(client.request(req).map(handle).flatten());
        core.run(future).unwrap()
    };

    println!("response body length: {}", response_body_length);
}

問題のコード:

extern crate serde;
extern crate serde_json;
use futures::{future, stream, Future, Stream};
use hyper;
use hyper::{client, Body, Chunk, Client, Headers, Method, Request, Response, header::Accept,
            header::Date as DateHeader, header::RetryAfter};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use models::Bucket;
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use std::str;

header! { (XRateLimitRemaining, "x-ratelimit-remaining") => [String] }

#[derive(Debug)]
struct Uri(pub String);

const MAX_REQ_SIZE: u32 = 500;

fn make_uri(symbol: &str, page_ix: u32) -> Uri {
    Uri(format!(
        "https://www.bitmex.com/api/v1/trade/bucketed?\
         symbol={symbol}&\
         columns={columns}&\
         partial=false&\
         reverse=true&\
         binSize={bin_size}&\
         count={count}&\
         start={start}",
        symbol = symbol,
        columns = "close,timestamp",
        bin_size = "5m",
        count = MAX_REQ_SIZE,
        start = 0 + MAX_REQ_SIZE * page_ix
    ))
}

#[derive(Debug)]
struct RateLimitInfo {
    remaining_reqs: u32,
    retry_after: Option<Duration>,
}

impl RateLimitInfo {
    fn default() -> RateLimitInfo {
        RateLimitInfo {
            remaining_reqs: 1,
            retry_after: None,
        }
    }
    fn from<T>(resp: &Response<T>) -> RateLimitInfo {
        let headers = resp.headers();
        let remaining_reqs = headers
            .get::<XRateLimitRemaining>()
            .unwrap_or_else(|| panic!("x-ratelimit-remaining not on request."))
            .parse()
            .unwrap();
        let retry_after = match headers.get::<RetryAfter>() {
            Some(RetryAfter::Delay(duration)) => Some(*duration),
            _ => None,
        };
        RateLimitInfo {
            remaining_reqs,
            retry_after,
        }
    }
}

fn resp_dated_later<'a>(a: &'a Response<Body>, b: &'a Response<Body>) -> &'a Response<Body> {
    let get_date = |resp: &Response<Body>| {
        let headers: &Headers = resp.headers();
        **headers.get::<DateHeader>().unwrap()
    };
    if get_date(&a) > get_date(&b) {
        a
    } else {
        b
    }
}

#[derive(Debug)]
struct Query {
    uri: Uri,
    response: Option<Response<Body>>,
}

impl Query {
    fn from_uri(uri: Uri) -> Query {
        Query {
            uri: uri,
            response: None,
        }
    }
}

fn query_good(q: &Query) -> bool {
    match &q.response {
        Some(response) => response.status().is_success(),
        _ => false,
    }
}

type HttpsClient = hyper::Client<HttpsConnector<client::HttpConnector>>;

type FutureQuery = Box<Future<Item = Query, Error = hyper::Error>>;

fn to_future(x: Query) -> FutureQuery {
    Box::new(future::ok(x))
}

fn exec_if_needed(client: &HttpsClient, query: Query) -> FutureQuery {
    fn exec(client: &HttpsClient, q: Query) -> FutureQuery {
        println!("exec: {:?}", q);
        let uri = q.uri;
        let req = {
            let mut req = Request::new(Method::Get, uri.0.parse().unwrap());
            req.headers_mut().set(Accept::json());
            req
        };
        Box::new(
            client
                .request(req)
                .inspect(|resp| println!("HTTP {}", resp.status()))
                .map(|resp| Query {
                    uri: uri,
                    response: Some(resp),
                }),
        )
    }
    if query_good(&query) {
        to_future(query)
    } else {
        exec(client, query)
    }
}

type BoxedFuture<T> = Box<Future<Item = T, Error = hyper::Error>>;

fn do_batch(client: &HttpsClient, queries: Vec<Query>) -> BoxedFuture<Vec<Query>> {
    println!("do_batch() {} queries", queries.len());
    let exec_if_needed = |q| exec_if_needed(client, q);
    let futures = queries.into_iter().map(exec_if_needed);
    println!("do_batch() futures {:?}", futures);
    Box::new(
        stream::futures_ordered(futures).collect(), //future::join_all(futures)
    )
}

fn take<T>(right: &mut Vec<T>, suggested_n: usize) -> Vec<T> {
    let n: usize = if right.len() < suggested_n {
        right.len()
    } else {
        suggested_n
    };
    let left = right.drain(0..n);
    return left.collect();
}

type BoxedResponses = Box<Vec<Response<Body>>>;

fn batched_throttle(uris: Vec<Uri>) -> BoxedResponses {
    println!("batched_throttle({} uris)", uris.len());
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    let mut rate_limit_info = RateLimitInfo::default();

    let mut queries_right: Vec<Query> = uris.into_iter().map(Query::from_uri).collect();

    loop {
        let mut queries_left: Vec<Query> = Vec::with_capacity(queries_right.len());

        println!("batched_throttle: starting inner loop");
        loop {
            // throttle program during testing
            thread::sleep(Duration::from_millis(800));
            println!("batched_throttle: {:?}", rate_limit_info);
            if let Some(retry_after) = rate_limit_info.retry_after {
                println!("batched_throttle: retrying after {:?}", retry_after);
                thread::sleep(retry_after)
            }
            if queries_right.is_empty() {
                break;
            }
            let mut queries_mid = {
                let ri_count = rate_limit_info.remaining_reqs;
                let iter_req_count = if ri_count == 0 { 1 } else { ri_count };
                println!("batched_throttle: iter_req_count {}", iter_req_count);
                take(&mut queries_right, iter_req_count as usize)
            };
            println!(
                "batched_throttle: \
                 queries_right.len() {}, \
                 queries_left.len() {}, \
                 queries_mid.len() {})",
                queries_right.len(),
                queries_left.len(),
                queries_mid.len()
            );
            if queries_mid.iter().all(query_good) {
                println!("batched_throttle: queries_mid.iter().all(query_good)");
                continue;
            }
            queries_mid = { core.run(do_batch(&client, queries_mid)).unwrap() };
            rate_limit_info = {
                let create_very_old_response =
                    || Response::new().with_header(DateHeader(UNIX_EPOCH.into()));
                let very_old_response = create_very_old_response();
                let last_resp = queries_mid
                    .iter()
                    .map(|q| match &q.response {
                        Some(r) => r,
                        _ => panic!("Impossible"),
                    })
                    .fold(&very_old_response, resp_dated_later);
                RateLimitInfo::from(&last_resp)
            };
            &queries_left.append(&mut queries_mid);
        }

        queries_right = queries_left;

        if queries_right.iter().all(query_good) {
            break;
        }
    }

    println!(
        "batched_throttle: finishing. queries_right.len() {}",
        queries_right.len()
    );

    Box::new(
        queries_right
            .into_iter()
            .map(|q| q.response.unwrap())
            .collect(),
    )
}

fn bucket_count_to_req_count(bucket_count: u32) -> u32 {
    let needed_req_count = (bucket_count as f32 / MAX_REQ_SIZE as f32).ceil() as u32;
    return needed_req_count;
}

type BoxedBuckets = Box<Vec<Bucket>>;

fn response_to_buckets(response: Response<Body>) -> BoxedFuture<Vec<Bucket>> {
    Box::new(response.body().concat2().map(|body: Chunk| -> Vec<Bucket> {
        println!("body.len(): {}", body.len());
        println!("JSON: {}", str::from_utf8(&body).unwrap());
        serde_json::from_slice(&body).unwrap()
    }))
}

pub fn get_n_last(symbol: &str, bucket_count: u32) -> BoxedBuckets {
    let req_count = bucket_count_to_req_count(bucket_count);
    let uris = (0..req_count)
        .map(|page_ix| make_uri(symbol, page_ix))
        .collect();

    let responses = batched_throttle(uris);

    let mut core = Core::new().unwrap();
    let boxed_buckets = {
        let futures = responses.into_iter().map(response_to_buckets);
        let future = stream::futures_ordered(futures).collect();
        let groups_of_buckets = core.run(future).unwrap();
        Box::new(
            groups_of_buckets
                .into_iter()
                .flat_map(|bs| bs.into_iter())
                .rev()
                .collect(),
        )
    };

    return boxed_buckets;
}
4

1 に答える 1