Kungfunk
Kungfunk

Reputation: 65

Message loss with lock-free MPMC queue when threads exit

Issue

I am getting inconsistent behavior with message loss while attempting to use a MPMC lock-less queue (concurrentqueue) in the implementation of a mutual exclusion algorithm. In particular, I have noticed the following:

  1. No message loss occurs before the termination procedure begins
  2. If the critical section code is minimal (short millisecond sleep), then the termination procedure will result in lost messages , and so threads will hang waiting on sockets.
  3. If the critical section code includes some work (perhaps sending a few TCP requests to an unrelated socket), then the termination procedure will succeed and all nodes will shutdown gracefully.

Relevant Code/Setup

I've had trouble coming up with a minimal example, but the basic architecture is such:

As far as message passing, we have:

Here are what I believe to be the relevant parts of the code:

Application

for (int i = 0; i < 10; i++) {
    client.cs_enter()  // Blocks until ready
    // do_some_work()
    sleep(5)
    client.cs_leave()
}

// wait for threads

Message

enum MessageKind { REQUEST, REPLY, CS_ENTER, CS_LEAVE, DONE, TERMINATE };
struct Message {
    MessageKind kind;
    uint32_t node;
    uint32_t timestamp;
};

Writer

while (true) {
    std::pair<uint32_t, Message> tm({ 0, Message::cs_enter() });
    q->wait_dequeue(tm);

    // MAX node means no more messages to send
    if (tm.first == std::numeric_limits<uint32_t>::max()) break;

    // map contains node_id -> socket_fd mapping
    send_message((*map)[tm.first], tm.second);
}
// exit thread

Readers

while (true) {
    Message m = recv_message(fd);
    MessageKind k = m.kind;
    q->enqueue(m);
    if (k == TERMINATE) break;
}
// exit thread

Coordinator

while(true) {
    if (this->done && this->terminated.size() == this->config->nodes.size() - 1)
        break;

    Message m = Message::cs_enter();
    this->input_queue->wait_dequeue(m);
    switch (m.kind) {
        // handle message based on kind
    }
}
// inform writer no more work to do
this->output_queue->enqueue({ std::numeric_limits<uint32_t>::max(), Message::done() });
// exit thread

I'm at a complete loss on how to proceed. Arbitrarily increasing critical section execution time is a flimsy solution, and I have no idea how well it will hold up in all the environments this implementation will be tested in. I also have no idea why increasing the critical section execution time works in the first place, seeing as how it is unrelated to the protocol code.

Do the semantics of my code not guarantee message delivery? More precisely, does the following code guarantee the consumer will receive the message?

// Producer - thread 1
q->enqueue(message);
thread_exit()

// Consumer - thread 2
Message m;
q->wait_dequeue(m);

Or do I need to write more to guarantee this property? (Memory fences, etc.)

Upvotes: 1

Views: 70

Answers (0)

Related Questions