Denis Moskvin
Denis Moskvin

Reputation: 3

Trying to build "lock-free" data structure C++

I am trying to implement class, that gives information, attached to this thread to a user. Threads can register and unregister in the class. Since registration, unregistration, and resolving buffers of the class are quite rare events, I lock the mutex. In regular cases, when clients call getBuffer () and there is no incoming requests for register/unregister, I just check the atomic bool variable _bufReconciled. But in practice, I catch the situations when the thread is registered, and his data added in the map, but another thread cant finds this element (debugger shows that the added element in the map). I suppose, that not all data of std::map _userTheradToData is synchronized between threads due to memory coherence but I don't know how to fix this. There is a code below that shows the problem:

#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <map>
#include <vector>
#include <cassert>
#include <exception>

struct UserData
{
    std::string _data1;
    std::string _data2;
};

class ThreadBufferManager
{

    std::map <std::thread::id, std::unique_ptr<UserData>> _userThreadToData;
    std::vector <std::thread::id> _pendingToUnregisterThread;
    std::vector <std::thread::id> _pendingToRegisterThread;

    std::atomic <bool> _bufReconciled;
    std::mutex _processingThreads;

    public:

    ThreadBufferManager ()
    {
        _bufReconciled = true;
    }

    void registerThread ()
    {
        std::lock_guard <std::mutex> lock (_processingThreads);
        _bufReconciled = false;
        _pendingToRegisterThread.push_back(std::this_thread::get_id());
    }

    void unregisterThread ()
    {
        std::lock_guard <std::mutex> lock (_processingThreads);
        _bufReconciled = false;
        _pendingToUnregisterThread.push_back(std::this_thread::get_id());
    }

    UserData& getBuffer ()
    {

        if (!_bufReconciled.load())
        {
                std::lock_guard<std::mutex> lock (_processingThreads);
                for (const auto& threadToUnregister : _pendingToUnregisterThread)
                {
                     std::cout << "REM" << threadToUnregister << std::endl;
                    _userThreadToData.erase(threadToUnregister);
                }

                for (const auto& threadToRegister : _pendingToRegisterThread)
                {
                    std::cout << "ADD" << threadToRegister << std::endl;
                    _userThreadToData.emplace(threadToRegister, std::unique_ptr<UserData> (new UserData));
                }

                _pendingToUnregisterThread.clear();
                _pendingToRegisterThread.clear();
                _bufReconciled = true;

       }


      auto it =  _userThreadToData.find (std::this_thread::get_id());
      if (it == _userThreadToData.end())
      {
            std::cout << "ERR" << std::this_thread::get_id() << std::endl;
            std::terminate();
      }
      return *it->second;
    }

};

void threadFoo (ThreadBufferManager& tbm)
{
    tbm.registerThread();
    for (std::size_t i = 0; i < 100; ++i)
    {
        tbm.getBuffer();
    }
    tbm.unregisterThread();
}


int main()
{
    ThreadBufferManager tbm;

    while (true)
    {
         std::thread thread1 (threadFoo, std::ref(tbm));
         std::thread thread2 (threadFoo, std::ref(tbm));
         std::thread thread3 (threadFoo, std::ref(tbm));
         std::thread thread4 (threadFoo, std::ref(tbm));
         std::thread thread5 (threadFoo, std::ref(tbm));
         std::thread thread6 (threadFoo, std::ref(tbm));
         std::thread thread7 (threadFoo, std::ref(tbm));
         std::thread thread8 (threadFoo, std::ref(tbm));
         std::thread thread9 (threadFoo, std::ref(tbm));
         std::thread thread10 (threadFoo, std::ref(tbm));
         std::thread thread11 (threadFoo, std::ref(tbm));
         std::thread thread12 (threadFoo, std::ref(tbm));
         std::thread thread13 (threadFoo, std::ref(tbm));

         thread1.join();
         thread2.join();
         thread3.join();
         thread4.join();
         thread5.join();
         thread6.join();
         thread7.join();
         thread8.join();
         thread9.join();
         thread10.join();
         thread11.join();
         thread12.join();
         thread13.join();
    }
    return 0;
}

Upvotes: 0

Views: 256

Answers (1)

Mike Vine
Mike Vine

Reputation: 9837

This wont work.

There is nothing stopping a thread accessing _userThreadToData [outside of a lock] whilst another is mutating the same member.

Consider thread '1' gets all the way and is about to run this line:

auto it =  _userThreadToData.find (std::this_thread::get_id());

Then thread '2' comes along and runs everything up to and including:

_userThreadToData.emplace(threadToRegister, std::unique_ptr<UserData> (new UserData));

You say that you dont access the data unless there "are no incoming requests for registration" but thats not the case - you check whether there are incoming requests and then sometime later you access the map. To write real lock free code (which is very hard) that check and access must be an atomic operation.

Just use locks.

Upvotes: 5

Related Questions