Reputation: 177
We are working on developing a low latency high throughput client which connects to different exchanges. The main piece of code that actually sends HTTP requests through is this below
void send_request(http_client::request_parameters request_params, http_client::finish_handler finish_handler, http_client::failure_handler failure_handler) {
boost::asio::spawn(
client_context.io_context,
[request_params = std::move(request_params), finish_handler = std::move(finish_handler), failure_handler = std::move(failure_handler), this](
boost::asio::yield_context yield_context) mutable
{
try {
auto const request_start_time = blosh::Time::get_current_time_in_nano_seconds();
std::string request_url(request_params.target);
if (!request_params.query_params.empty()) {
std::vector<std::string> params_strings;
for (auto const ¶m : request_params.query_params) {
params_strings.emplace_back(
fmt::format("{}={}", param.key().data(), param.value().get_string().data()));
}
request_url.append("?");
request_url.append(boost::algorithm::join(params_strings, "&"));
}
boost::beast::http::request<boost::beast::http::string_body> http_request{
request_params.verb,
request_url,
11};
http_request.set(boost::beast::http::field::host, http_hostname);
http_request.set(boost::beast::http::field::content_type, "application/json");
http_request.set(boost::beast::http::field::connection, "Keep-Alive");
http_request.set(boost::beast::http::field::keep_alive, "timeout=86400");
http_request.keep_alive(true);
for (auto const ¶m : request_params.header_params) {
http_request.set(param.first, param.second);
}
if (const boost::json::object *json_object =
std::get_if<boost::json::object>(&request_params.body_params)) {
if (!json_object->empty()) http_request.body() = boost::json::serialize(*json_object);
} else if (const boost::json::array *json_array =
std::get_if<boost::json::array>(&request_params.body_params)) {
if (!json_array->empty()) http_request.body() = boost::json::serialize(*json_array);
}
http_request.prepare_payload();
boost::system::error_code ec;
boost::beast::http::async_write(tcp_stream.value(), http_request, yield_context[ec]);
if (ec) {
LOG_ERROR << "[http_client] [send_request] async_write failed. error: " << ec.what();
failure_handler(std::move(request_params), std::move(finish_handler), http_hostname_index, retries);
}
LOG_INFO << "[http_client] [send_request] method = " << http_request.method_string() << ", target = " << http_request.target() << ", hostname: " << http_hostname
<< ", id = " << this->get_unique_id() << ", usage = " << this->get_usage_counter();
// Declare a container to hold the response and empty out the buffer.
response http_response;
boost::beast::http::async_read(tcp_stream.value(), receive_flat_buffer, http_response, yield_context[ec]);
if (ec) {
LOG_ERROR << "[http_client] [send_request] async_read failed. error: " << ec.what();
}
connection_latency_ns = static_cast<uint64_t>(blosh::Time::get_current_time_in_nano_seconds() - request_start_time);
usage += 1;
finish_handler(http_response);
} catch (const std::exception& e) {
LOG_ERROR << "[http_client] [send_request] failure. error: " << e.what() << ", hostname = " << http_hostname << ", id = " << this->get_unique_id();
}
}
);
}
And here's the IO context which runs under another thread under a busy loop
// start both rest and websocket threads
boost::asio::io_context rest_io_context{};
std::thread rest_thread{};
bool rest_io_context_run_flag{true};
rest_thread = std::thread([&]{
#ifndef LOCAL_BUILD
cpu_core_mutex.lock();
if (cpu_cores.size()) {
LOG_INFO << "rest cpu core " << cpu_cores.front();
blosh::scheduling::pin_core(cpu_cores.front());
cpu_cores.pop_front();
}
cpu_core_mutex.unlock();
#endif
while (rest_io_context_run_flag) {
rest_io_context.restart();
rest_io_context.poll();
}
});
The reason that went with busy polling instead of .run()
is because of this (Can Boost ASIO be used to build low-latency applications?)
This works well 99% of the time but when the volatility hits and lots of market activity, some of the HTTP requests are sent, and we don't hear anything back from them. We have a fallback logic that if we don't hear back in like 5 seconds, we try again which results in us sending HTTP requests through async_write
but no response from async_read
. For example, take a look at these logs where it logs after async_write
but nothing after that
20230831 17:40:47.110427Z 24792 INFO [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXXXXX&origClientOrderId=XXXXX×tamp=XXXXX&signature=XXXX, hostname: fapi.binance.com, id = 9838412294785007617, usage = 218 - http_client.cpp:129
20230831 17:40:52.210959Z 24792 INFO [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXX&origClientOrderId=XXXXX×tamp=1693503652210&signature=XXXX, hostname: fapi.binance.com, id = XXXX, usage = 220 - http_client.cpp:129
20230831 17:40:57.297817Z 24792 INFO [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXXXX&origClientOrderId=XXXXX×tamp=XXXXX&signature=XXXXX, hostname: fapi.binance.com, id = XXXXX, usage = 128 - http_client.cpp:129
This bug has been frustrating us for the last week or so because in isolated mode, this works super well but when it connects with other systems, that's when the problem appears. I was wondering whether the boost::asio
or boost::beast
community can help me out
Is there anything on the boost::asio
I am not aware of where my TCP writes / TCP reads are being discarded
Here's my sysctl.conf btw if anyone interested
net.core.wmem_max=134217728
net.core.rmem_max=134217728
net.ipv4.tcp_rmem = 4194304 8388608 134217728
net.ipv4.tcp_wmem = 4194304 8388608 134217728
net.ipv4.tcp_window_scaling = 1
net.ipv4.tcp_no_metrics_save = 1
How do I reliably track that my request actually went through kernel, network, and out of the box? How do I make sure that my completion handler is processing requests that are finished and waiting to be acted upon? Or maybe something completely else is causing this issue.
Upvotes: 0
Views: 81