YoonSeok OH
YoonSeok OH

Reputation: 745

How to make Thread Pool Tasks in abstract function type?

I'm trying to implement very simple C++ Thread Pool. So far, I've checked it working. However, I want to make tasks in abstract form. I've searched for dozens of articles, they don't seem what I want. (Perhaps my keyword wasn't appropriate...) Currently, only

void (*)()

form can be accepted as task function. Simple code is written below.

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <Windows.h>

class CYSThreadPool
{
private:

    static std::thread** s_ppThreads;
    static long s_lThreadCount;
    static bool s_bJoin;
    static std::mutex* s_pMutexJoin;

    static std::queue<void (*)()> s_queueTasks;
    static std::mutex* s_pMutexTasks;

    CYSThreadPool(){}
    ~CYSThreadPool(){}

    static void ThreadFunction()
    {
        while (true)
        {
            if (!s_pMutexJoin->try_lock())
                continue;

            bool bJoin = s_bJoin;
            s_pMutexJoin->unlock();

            if (bJoin)
                break;

            if (!s_pMutexTasks->try_lock())
                continue;

            void (*fp)() = nullptr;
            if (s_queueTasks.size() > 0ull)
            {
                fp = s_queueTasks.front();
                s_queueTasks.pop();
            }

            s_pMutexTasks->unlock();

            if (fp != nullptr)
                fp();
        }
    }

public:

    enum class EResult : unsigned long
    {
        Success = 0ul,
        Fail_Undefined = 1ul,
        Fail_InvalidThreadCount = 2ul,
        Fail_ArgumentNull = 3ul
    };

    static const EResult Join()
    {
        if (s_ppThreads == nullptr)
        {
            if (s_lThreadCount == 0l)
                return EResult::Success;
            else
                return EResult::Fail_Undefined;
        }
        else
        {
            if (s_lThreadCount <= 0l)
                return EResult::Fail_Undefined;
            else
            {
                s_pMutexJoin->lock();
                s_bJoin = true;
                s_pMutexJoin->unlock();

                for (long i = 0l; i < s_lThreadCount; ++i)
                {
                    s_ppThreads[i]->join();
                    s_ppThreads[i] = nullptr;
                }

                delete s_ppThreads;
                s_ppThreads = nullptr;
                s_lThreadCount = 0l;

                s_pMutexJoin->lock();
                s_bJoin = false;
                s_pMutexJoin->unlock();
            }
        }

        return EResult::Success;
    }

    static const EResult CreateThreads(const long _lThreadCount)
    {
        if (_lThreadCount < 0l)
            return EResult::Fail_InvalidThreadCount;

        if (Join() != EResult::Success)
            return EResult::Fail_Undefined;

        if (_lThreadCount == 0l)
            return EResult::Success;

        s_ppThreads = new std::thread*[_lThreadCount]{};

        for (long i = 0l; i < _lThreadCount; ++i)
            s_ppThreads[i] = new std::thread(ThreadFunction);

        s_lThreadCount = _lThreadCount;

        return EResult::Success;
    }

    static const EResult AddTask(void (*_fp)())
    {
        if (_fp == nullptr)
            return EResult::Fail_ArgumentNull;

        s_pMutexTasks->lock();
        s_queueTasks.push(_fp);
        s_pMutexTasks->unlock();

        return EResult::Success;
    }
};

std::thread** CYSThreadPool::s_ppThreads = nullptr;
long CYSThreadPool::s_lThreadCount = 0l;
bool CYSThreadPool::s_bJoin = false;
std::mutex* CYSThreadPool::s_pMutexJoin = new std::mutex();
std::queue<void (*)()> CYSThreadPool::s_queueTasks;
std::mutex* CYSThreadPool::s_pMutexTasks = new std::mutex();

void Test0()
{
    for (long i = 0l; i < 100000l; ++i)
    {
        std::cout << "A";
    }
}

int main()
{
    CYSThreadPool::EResult eResult = CYSThreadPool::Join();
    eResult = CYSThreadPool::CreateThreads(-1l);
    eResult = CYSThreadPool::CreateThreads(1l);
    eResult = CYSThreadPool::CreateThreads(0l);
    eResult = CYSThreadPool::CreateThreads(2l);

    CYSThreadPool::AddTask(Test0);

    Sleep(1000ul);

    CYSThreadPool::Join();

    return 0;
}

Kind and elaborated answer will be truly appreciated!

EDIT:

What I meant by 'Function in abstract form' is to get any function types other than void ()(). Such as void ()(long), void ()(ID3D12Resource1, IDXGISwapChain), or HResult ()(ID3D12Device6, ID3D12Resource1*, IDXGISwapChain) and so on.

Thanks for the comment about using condition_variable instead of try_lock and wrapping fp() into try-catch. I'm sorry but I don't think I'm catching what you mean. May I ask for more detail?

Upvotes: 0

Views: 222

Answers (1)

freakish
freakish

Reputation: 56527

So here are few things to improve your code:

  1. You can use std::function<void()> instead of void(*)(). This will allow you to call any callable, not only function pointers, by wrapping them in a lambda, e.g. CYSThreadPool::AddTask([]() { Add(3, 1); });.
  2. Even better, make an abstract base class, say Task with virtual .Run() method and use it instead. You can derive FunctionPointerTask or StdFunctionTask (you can change names obviously) out of it if you want. You can create say ProcessResource1Task and pass necessary arguments to constructor so that .Run() only executes it.
  3. Your ThreadFunction workers are very inefficient. You spin all the time on try_lock. Don't. Use condition_variables instead.
  4. Even better, move the queueing stuff into a separate thread-safe queue class.
  5. Your code is not exception-safe. Don't use manual lock/try_lock, use lock_guard or unique_lock instead. As it is your code can easily deadlock. If you must use manual lock/try_lock then you have to use try-catch.
  6. Lots of static variables. Don't. Make all of them non-static members, add appropriate constructors and destructor. Your thread pool, once joined is completely unusable. Plus Join() is not thread-safe (it would matter less if the thread pool wasn't completely static).
  7. You can improve performance by making bool s_bJoin atomic. It's not necessary to read/write it under the lock.

There might be some other issues, that's what I can see for now.

You can also view this code for a not bad thread pool implementation. It is short and utilizes most of the above hints.

Upvotes: 2

Related Questions