Reputation: 1583
I wrote a test for ZeroMQ to convince myself that it manages to map replies to the client independent from processing order, which would prove it thread safe.
It is a multi-threaded server, which just throws the received messages back at the sender. The client sends some messages from several threads and checks, if it receives the same message back. For multi-threading I use OpenMP.
That test worked fine and I wanted to move on and re-implement it with C++ bindings for ZeroMQ. And now it doesn't work in the same way anymore.
Here's the code with ZMQPP:
#include <gtest/gtest.h>
#include <zmqpp/zmqpp.hpp>
#include <zmqpp/proxy.hpp>
TEST(zmqomp, order) {
zmqpp::context ctx;
std::thread proxy([&ctx] {
zmqpp::socket dealer(ctx, zmqpp::socket_type::xrequest);
zmqpp::socket router(ctx, zmqpp::socket_type::xreply);
router.bind("tcp://*:1234");
dealer.bind("inproc://workers");
zmqpp::proxy(router, dealer);
});
std::thread worker_starter([&ctx] {
#pragma omp parallel
{
zmqpp::socket in(ctx, zmqpp::socket_type::reply);
in.connect("inproc://workers");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string request;
in.receive(request);
in.send(request);
}
}
});
std::thread client([&ctx] {
#pragma omp parallel
{
zmqpp::socket out(ctx, zmqpp::socket_type::request);
out.connect("tcp://localhost:1234");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string msg("Request " + std::to_string(i));
out.send(msg);
std::string reply;
out.receive(reply);
EXPECT_EQ(reply, msg);
}
}
});
client.join();
worker_starter.join();
ctx.terminate();
proxy.join();
}
The test blocks and doesn't get executed to the end. I played around with #pragma
s a little bit and found out that only one change can "fix" it:
//#pragma omp parallel for
for (int i = 0; i < 250; i++) {
The code is still getting executed parallel in that case, but I have to divide the loop executions number by a number of my physical cores.
Does anybody have a clue what's going on here?
Upvotes: 1
Views: 669
Reputation: 1
This normally does not matter as there are some safe-guarding design practices, but situation here goes even worse, once following the proposed TEST(){...}
design.
Having spent some time with ZeroMQ, your proposal headbangs due to violations on several principal things, that otherwise help distributed architectures to work smarter, than a pure SEQ
of monolithic code.
ZeroMQ convinces in ( almost ) every third paragraph to avoid sharing of resources. Zero-sharing is one of the ZeroMQ's fabulous scalable performance and minimised latency maxims, so to say in short.
So one has better to avoid sharing zmq.Context()
instance at all ( unless one knows pretty well, why and how the things work under the hood ).
Thus an attempt to fire 1000-times ( almost ) in parallel ( well, not a true PAR
) some flow of events onto a shared instance of zmq.Context
( the less once it was instantiated with default parameters and having none performance tuning adaptations ) will certainly suffer from doing the very opposite from what is, performance-wise and design-wise, recommended to do.
1) Each zmq.Context()
instance has a limited amount of I/O-threads, that were created during the instantiation process. Once a fair design needs some performance-tuning, it is possible to increase such number of I/O-threads and data-pumps will work that better ( sure, none amount of data-pumps will salvage a poor, the less a disastrous design / architecture of a distributed computing system. This is granted. ).
2) Each zmq.Socket()
instance has an { implicit | explicit } mapping onto a respective I/O-thread ( Ref. 1) ). Once a fair design needs some increased robustness against sluggish event-loop handlings or against other adverse effects arisen from data-flow storms ( or load-balancing or you name it ), there are chances to benefit from a divide-and-conquer approach to use .setsockopt( zmq.AFFINITY, ... )
method to directly map each zmq.Socket()
instance onto a respective I/O-thread, and remain thus in control of what buffering and internal queues are fighting for which resources during the real operations. In any case, where a total amount of threads goes over the localhost number of cores, the just-CONCURRENT scheduling is obvious ( so a dream of a true PAR
execution is principally and inadvertently lost. This is granted. ).
3) Each zmq.Socket()
has also a pair of "Hidden Queue Devastators", called High-Watermarks. These get set either { implicitly | explicitly }, the latter being for sure a wiser manner for performance tuning. Why Devastators? Because these stabilise and protect the distributed computing systems from overflows and are permitted to simply discard each and every message above the HWM
level(s) so as to protect the systems capability to run forever, even under heavy storms, spurious blasts of crippled packets or DDoS-types of attack. There are many tools for tuning this domain of ZeroMQ Context()-instance's behaviour, which go beyond the scope of this answer ( Ref.: other my posts on ZeroMQ AFFINITY
benefits or the ZeroMQ API specifications used in .setsockopt()
method ).
4) Each tcp://
transport-class based zmq.Socket()
instance has also inherited some O/S dependent heritage. Some O/S demonstrate this risk by extended accumulation of ip-packets ( outside of any ZeroMQ control ) until some threshold got passed and thus a due design care ought be taken for such cases to avoid adverse effects on the intended application signalling / messaging dynamics and robustness against such uncontrollable ( exosystem ) buffering habits.
5) Each .recv()
and .send()
method call is by-definition blocking, a thing a massively distributed computing system ought never risk to enter into. Never ever. Even in a school-book example. Rather use non-blocking form of these calls. Always. This is granted.
6) Each zmq.Socket()
instance ought undertake a set of careful and graceful termination steps. A preventive step of .setsockopt( zmq.LINGER, 0 )
+ an explicit .close()
methods are fair to be required to be included in every use-case ( and made robust to get executed irrespective of any exceptions that may get appeared. ). A poor { self- | team- }-discipline in this practice is a sure ticket into hanging up the whole application infrastructure due to just not paying due care on a mandatory resources management policy. This is a must-have part of any serious distributed computing Project. Even the school-book examples ought have this. No exceptions. No excuse. This is granted.
Upvotes: 1