Doug
Doug

Reputation: 805

Best way to handle multi-thread cleanup

I have a server-type application, and I have an issue with making sure thread's aren't deleted before they complete. The code below pretty much represents my server; the cleanup is required to prevent a build up of dead threads in the list.

using namespace std;

class A {
public:
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) {
       somethingThread = thread([cleanupFunction, getStopFlag, this]() {
          doSomething(getStopFlag);
          cleanupFunction();
       });

    }
private:
    void doSomething(function<bool()> getStopFlag);
    thread somethingThread;
    ...
}

class B {
public:
    void runServer();

    void stop() {
        stopFlag = true;
        waitForListToBeEmpty();
    }
private:
    void waitForListToBeEmpty() { ... };
    void handleAccept(...) {
        shared_ptr<A> newClient(new A());
        { 
            unique_lock<mutex> lock(listMutex);
            clientData.push_back(newClient);
        }
        newClient.doSomethingThreaded(bind(&B::cleanup, this, newClient), [this]() {
            return stopFlag;
        });
    }

    void cleanup(shared_ptr<A> data) {
        unique_lock<mutex> lock(listMutex);
        clientData.remove(data);
    }

    list<shared_ptr<A>> clientData;
    mutex listMutex;
    atomc<bool> stopFlag;
}

The issue seems to be that the destructors run in the wrong order - i.e. the shared_ptr is destructed at when the thread's function completes, meaning the 'A' object is deleted before thread completion, causing havok when the thread's destructor is called.

i.e. Call cleanup function All references to this (i.e. an A object) removed, so call destructor (including this thread's destructor) Call this thread's destructor again -- OH NOES!

I've looked at alternatives, such as maintaining a 'to be removed' list which is periodically used to clean the primary list by another thread, or using a time-delayed deletor function for the shared pointers, but both of these seem abit chunky and could have race conditions.

Anyone know of a good way to do this? I can't see an easy way of refactoring it to work ok.

Upvotes: 4

Views: 5312

Answers (3)

Doug
Doug

Reputation: 805

For those who are interested, I took abit of both answers given (i.e. James' detach suggestion, and Chris' suggestion about shared_ptr's).

My resultant code looks like this and seems neater and doesn't cause a crash on shutdown or client disconnect:

using namespace std;

class A {
public:
    void doSomething(function<bool()> getStopFlag) {
        ...
    }
private:
    ...
}

class B {
public:
    void runServer();

    void stop() {
        stopFlag = true;
        waitForListToBeEmpty();
    }
private:
    void waitForListToBeEmpty() { ... };
    void handleAccept(...) {
        shared_ptr<A> newClient(new A());
        { 
            unique_lock<mutex> lock(listMutex);
            clientData.push_back(newClient);
        }
        thread clientThread([this, newClient]() { 
            // Capture the shared_ptr until thread over and done with.

            newClient->doSomething([this]() {
                return stopFlag;
            });
            cleanup(newClient);
        });
        // Detach to remove the need to store these threads until their completion.
        clientThread.detach();
    }

    void cleanup(shared_ptr<A> data) {
        unique_lock<mutex> lock(listMutex);
        clientData.remove(data);
    }

    list<shared_ptr<A>> clientData; // Can remove this if you don't 
                                    // need to connect with your clients.
                                    // However, you'd need to make sure this 
                                    // didn't get deallocated before all clients 
                                    // finished as they reference the boolean stopFlag
                                    // OR make it a shared_ptr to an atomic boolean
    mutex listMutex;
    atomc<bool> stopFlag;
}

Upvotes: 0

Chris Dodd
Chris Dodd

Reputation: 126438

The issue is that, since you manage A via shared pointers, the this pointer captured by the thread lambda really needs to be a shared pointer rather than a raw pointer to prevent it from becoming dangling. The problem is that there's no easy way to create a shared_ptr from a raw pointer when you don't have an actual shared_ptr as well.

One way to get around this is to use shared_from_this:

class A : public enable_shared_from_this<A> {
public:
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) {
       somethingThread = thread([cleanupFunction, getStopFlag, this]() {
          shared_ptr<A> temp = shared_from_this();
          doSomething(getStopFlag);
          cleanupFunction();
       });

this creates an extra shared_ptr to the A object that keeps it alive until the thread finishes.

Note that you still have the problem with join/detach that James Kanze identified -- Every thread must have either join or detach called on it exactly once before it is destroyed. You can fulfill that requirement by adding a detach call to the thread lambda if you never care about the thread exit value.

You also have potential for problems if doSomethingThreaded is called multiple times on a single A object...

Upvotes: 2

James Kanze
James Kanze

Reputation: 154017

Are the threads joinable or detached? I don't see any detach, which means that destructing the thread object without having joined it is a fatal error. You might try simply detaching it, although this can make a clean shutdown somewhat complex. (Of course, for a lot of servers, there should never be a shutdown anyway.) Otherwise: what I've done in the past is to create a reaper thread; a thread which does nothing but join any outstanding threads, to clean up after them.

I might add that this is a good example of a case where shared_ptr is not appropriate. You want full control over when the delete occurs; if you detach, you can do it in the clean up function (but quite frankly, just using delete this; at the end of the lambda in A::doSomethingThreaded seems more readable); otherwise, you do it after you've joined, in the reaper thread.

EDIT:

For the reaper thread, something like the following should work:

class ReaperQueue
{
    std::deque<A*> myQueue;
    std::mutex myMutex;
    std::conditional_variable myCond;
    A* getOne()
    {
        std::lock<std::mutex> lock( myMutex );
        myCond.wait( lock, [&]( !myQueue.empty() ) );
        A* results = myQueue.front();
        myQueue.pop_front();
        return results;
    }
public:
    void readyToReap( A* finished_thread )
    {
        std::unique_lock<std::mutex> lock( myMutex );
        myQueue.push_back( finished_thread );
        myCond.notify_all();
    }

    void reaperThread()
    {
        for ( ; ; )
        {
            A* mine = getOne();
            mine->somethingThread.join();
            delete mine;
        }
    }
};

(Warning: I've not tested this, and I've tried to use the C++11 functionality. I've only actually implemented it, in the past, using pthreads, so there could be some errors. The basic principles should hold, however.)

To use, create an instance, then start a thread calling reaperThread on it. In the cleanup of each thread, call readyToReap.

To support a clean shutdown, you may want to use two queues: you insert each thread into the first, as it is created, and then move it from the first to the second (which would correspond to myQueue, above) in readyToReap. To shut down, you then wait until both queues are empty (not starting any new threads in this interval, of course).

Upvotes: 3

Related Questions