Reputation: 18864
With the task in mind - do a HGETALL on all keys matching a pattern (potentially millions).
I get keys on one connection and execute HGETALL concurrently on another connection. This still does not solve the HGETALL round trip latency, which I would like to completely get rid of.
What I want to do is push HGETALL requests with an in-flight window like there is no tomorrow.
I know I can do a single request with multiple HGETALL-s in it*, but then I still need to wait for a response and once in a while pay that latency. *- although I still yet to figure out how to shape the response type for such request so it is not statically sized.
Is there a better way?
I am using coroutines syntax, so the code now would look like:
auto request = redis::request{};
request.push("HGETALL", key);
auto response = redis::response<std::vector<std::string>>{};
co_await conn->async_exec(request, response, asio::deferred);
Thanks!
Upvotes: 1
Views: 102
Reputation: 21
I couldn't find a solution using boost::redis::generic_response. Hence I used boost::redis::response itself to read responses for pipelined requests. I used parameterization of tuples as shown here: Parameterize of tuple with repeated type. My code looks like the following:
template<typename T, typename Seq>
struct expander;
template<typename T, std::size_t... Is>
struct expander<T, std::index_sequence<Is...>> {
template<typename E, std::size_t>
using elem = E;
using type = boost::redis::response<elem<T, Is>...>;
};
template <size_t N, class Type>
struct my_tuple
{
using type = typename expander<Type, std::make_index_sequence<N>>::type;
};
The following is a snippet from a google test case demonstrating pipelining and getting response using boost::redis::response
{
request req;
response<boost::redis::ignore_t> resp;
req.push("HSET", "testkey1", "name", "abc");
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
}
{
request req;
response<boost::redis::ignore_t> resp;
req.push("HSET", "testkey2", "name", "def");
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
}
{
std::vector<std::string> fields;
fields.push_back("name");
request req;
req.push_range("HMGET", "testkey1", fields);
req.push_range("HMGET", "testkey2", fields);
ASSERT_EQ(req.get_commands(), 2);
my_tuple<2, std::vector<std::string>>::type resp;
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
std::cout << "HMGET: " << std::get<0>(resp).value()[0] << std::endl;
std::cout << "HMGET: " << std::get<1>(resp).value()[0] << std::endl;
}
Upvotes: 1
Reputation: 392833
Looking at things, you can probably get everything you want from redis::generic_response
. It's gonna be some work, but if you know your target application and the commands are all the same you will probably be able to make it work without too much effort:
redis::request request;
for (auto const& key : keys)
request.push("HGETALL", key);
request.push("QUIT");
redis::generic_response response;
co_await conn->async_exec(request, response, asio::deferred);
auto const& gen = response.value();
fmt::print("generic: {}\n", gen);
It might be advantageous for the interpretation side to batch in a transaction (I'm not a Redis expert, so I have no idea whether this hurts server performance):
redis::request request;
request.push("MULTI");
for (auto const& key : keys)
request.push("HGETALL", key);
request.push("EXEC");
request.push("QUIT");
Using simple formatters to display the structure of the responses:
#include <boost/asio.hpp>
#include <boost/redis/src.hpp>
#include <fmt/ostream.h>
#include <fmt/ranges.h>
namespace redis = boost::redis;
namespace asio = boost::asio;
template <> struct fmt::formatter<redis::resp3::type> : fmt::ostream_formatter {};
template <> struct fmt::formatter<redis::resp3::node> : fmt::formatter<std::string> {
template <typename FormatContext> auto format(redis::resp3::node const& node, FormatContext& ctx) const {
return format_to(ctx.out(), "({}@{}, {}, {})", node.data_type, node.depth, node.value,
node.aggregate_size);
}
};
asio::awaitable<void> use_redis(std::vector<std::string> keys) {
auto conn = std::make_shared<redis::connection>(co_await asio::this_coro::executor);
redis::config cfg;
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
{
redis::request request;
for (auto const& key : keys)
request.push("HGETALL", key);
request.push("QUIT");
redis::generic_response response;
co_await conn->async_exec(request, response, asio::deferred);
fmt::print("generic: {}\n", response.value());
}
conn->cancel();
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
{
redis::request request;
request.push("MULTI");
for (auto const& key : keys)
request.push("HGETALL", key);
request.push("EXEC");
request.push("QUIT");
redis::generic_response response;
co_await conn->async_exec(request, response, asio::deferred);
fmt::print("multi: {}\n", response.value());
}
conn->cancel();
}
int main() {
asio::io_context ioc(1);
co_spawn(ioc, use_redis({"foo", "bar"}), asio::detached);
ioc.run();
}
With a local demo:
HSET foo bar baz
HSET bar qux quuz
$ redis-cli
127.0.0.1:6379> HGETALL foo
1) "bar"
2) "baz"
127.0.0.1:6379> HGETALL bar
1) "qux"
2) "quuuz"
127.0.0.1:6379>
$ ./build/sotest
generic: [(map@0, , 1), (blob_string@1, bar, 1), (blob_string@1, baz, 1), (map@0, , 1), (blob_string@1, qux, 1), (blob_string@1, quuuz, 1), (simple_string@0, OK, 1)]
multi: [(simple_string@0, OK, 1), (simple_string@0, QUEUED, 1), (simple_string@0, QUEUED, 1), (array@0, , 2), (map@1, , 1), (blob_string@2, bar, 1), (blob_string@2, baz, 1), (map@1, , 1), (blob_string@2, qux, 1), (blob_string@2, quuuz, 1), (simple_string@0, OK, 1)]
Upvotes: 1