Reputation: 65
I am getting inconsistent behavior with message loss while attempting to use a MPMC lock-less queue (concurrentqueue) in the implementation of a mutual exclusion algorithm. In particular, I have noticed the following:
I've had trouble coming up with a minimal example, but the basic architecture is such:
As far as message passing, we have:
Here are what I believe to be the relevant parts of the code:
for (int i = 0; i < 10; i++) {
client.cs_enter() // Blocks until ready
// do_some_work()
sleep(5)
client.cs_leave()
}
// wait for threads
enum MessageKind { REQUEST, REPLY, CS_ENTER, CS_LEAVE, DONE, TERMINATE };
struct Message {
MessageKind kind;
uint32_t node;
uint32_t timestamp;
};
while (true) {
std::pair<uint32_t, Message> tm({ 0, Message::cs_enter() });
q->wait_dequeue(tm);
// MAX node means no more messages to send
if (tm.first == std::numeric_limits<uint32_t>::max()) break;
// map contains node_id -> socket_fd mapping
send_message((*map)[tm.first], tm.second);
}
// exit thread
while (true) {
Message m = recv_message(fd);
MessageKind k = m.kind;
q->enqueue(m);
if (k == TERMINATE) break;
}
// exit thread
while(true) {
if (this->done && this->terminated.size() == this->config->nodes.size() - 1)
break;
Message m = Message::cs_enter();
this->input_queue->wait_dequeue(m);
switch (m.kind) {
// handle message based on kind
}
}
// inform writer no more work to do
this->output_queue->enqueue({ std::numeric_limits<uint32_t>::max(), Message::done() });
// exit thread
I'm at a complete loss on how to proceed. Arbitrarily increasing critical section execution time is a flimsy solution, and I have no idea how well it will hold up in all the environments this implementation will be tested in. I also have no idea why increasing the critical section execution time works in the first place, seeing as how it is unrelated to the protocol code.
Do the semantics of my code not guarantee message delivery? More precisely, does the following code guarantee the consumer will receive the message?
// Producer - thread 1
q->enqueue(message);
thread_exit()
// Consumer - thread 2
Message m;
q->wait_dequeue(m);
Or do I need to write more to guarantee this property? (Memory fences, etc.)
Upvotes: 1
Views: 70