Loki Astari
Loki Astari

Reputation: 264331

Behavior of C++ std::thread

I am in the processes of writing some article about using Blocking Vs Non-Blocking sockets. I am currently doing some experiments using threads and blocking sockets and have turned up some interesting results I am not sure how to explain.

Note: I know that modern servers use an event driven model with non-blocking sockets to achieve much better performance and I am working in the direction but I want to get the base line data numbers first.

The question I think I should ask is below. But any input on what is happening or what I should be actually asking or need to time/measure/examin would be gratefully accepted.

Set Up

Experiments are running on Amazon:

Instance T    vCPUs     Memory (GiB)   Storage (GB) Network
c3.2xlarge     8          15           2 x 80 SSD     High

I am using siege to load test the server:

> wc data.txt
   0       1      32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32'  -q '<host>/message POST < data.txt'

The Servers:

I have four versions of the code. Which is the most basic basic type of http server. No matter what you request you get the same response (this is basically to test throughput).

  1. Single Threaded.
  2. Multi Threaded
    Each accepted request is then handled by std::thread which is detached.
  3. Multi Thread with Pool
    A fixed size thread pool of std::thread. Each accepted request creates a job that is added to job queue for processing by the thread pool.
  4. Multi Thread using std::async()
    Each accepted request is executed via `std::async() the future is stored in a queue. A secondary thread waits for each future to complete before discarding it.

Expectations

Actual Results

Actual concurrent sizes tried.

1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255

Response Size 200 bytes Response Size 2000 bytes

I was surprised at the performance of the "Multi" Threaded version. So I doubled the size of the thread pool version to see what happened.

ThreadQueue     jobs(std::thread::hardware_concurrency());
// Changed this line to:
ThreadQueue     jobs(std::thread::hardware_concurrency() * 2);

That's why you see two lines for thread pool in the graphs.

Need Help

It's not unexpected that the standard library std::async() is the best version. But I am totally dumbfounded by the Multi threaded version having basically the same performance.

This version (Multi Threaded) creates a new thread for every accepted incoming connection and then simply detaches the thread (allowing it to run to completion). As the concurrency reaches 255 we will have 255 background threads running in the processes.

So the question:

Given the short runtime of Socket::worker() I can not believe the cost of creating a thread is negligible in comparison to this work. Also because it maintains a similar performance to std::async() it seems to suggest that there is some re-use going on behind the scenes.

Does anybody have any knowledge about the standards requirements for thread re-use and what I should expect the re-use behavior to be?

At what point will the blocking model break down? At 255 concurrent requests I was not expecting the threading model to keep up. I obviously need to reset my expectations here.

Code

The socket wrapper code is a very thin layer of standard sockets (just throwing exceptions when things go wrong). The current code is here if needed but I don't think it matters.

The full source of this code is available here.

Sock::worker

This is the shared bit of code that is common to all the servers. Basically it receives an accepted socket object (via move) and basically writes the data object to that socket.

void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
{
    DataSocket  accept(std::move(accepted));
    HTTPServer  acceptHTTPServer(accept);
    try
    {
        std::string message;
        acceptHTTPServer.recvMessage(message);
        // std::cout << message << "\n";
        if (!finished && message == "Done")
        {
            finished = 1;
            server.stop();
            acceptHTTPServer.sendMessage("", "Stoped");
        }
        else
        {
            acceptHTTPServer.sendMessage("", data);
        }
    }
    catch(DropDisconnectedPipe const& e)
    {
        std::cerr << "Pipe Disconnected: " << e.what() << "\n";
    }
}

Single Thread

int main(int argc, char* argv[])
{
    // Builds a string that is sent back with each response.
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();
        // Simply sends "data" back over http.
        Sock::worker(std::move(accept), server, data, finished);
    }
}

Multi Thread

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
        work.detach();
    }
}

Mult Thread with Queue

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
    ThreadQueue     jobs(std::thread::hardware_concurrency());

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        // Had some issues with storing a lambda that captured
        // a move only object so I created WorkJob as a simple
        // functor instead of the lambda.
        jobs.startJob(WorkJob(std::move(accept), server, data, finished));
    }
}

Then Auxiliary code to control the pool

class WorkJob
{
    Sock::DataSocket    accept;
    Sock::ServerSocket& server;
    std::string const&  data;
    int&                finished;
    public:
        WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
            : accept(std::move(accept))
            , server(server)
            , data(data)
            , finished(finished)
        {}
        WorkJob(WorkJob&& rhs)
            : accept(std::move(rhs.accept))
            , server(rhs.server)
            , data(rhs.data)
            , finished(rhs.finished)
        {}
        void operator()()
        {
            Sock::worker(std::move(accept), server, data, finished);
        }
};
class ThreadQueue
{
    using WorkList = std::deque<WorkJob>;

    std::vector<std::thread>    threads;
    std::mutex                  safe;
    std::condition_variable     cond;
    WorkList                    work;
    int                         finished;

    WorkJob getWorkJob()
    {
        std::unique_lock<std::mutex>     lock(safe);
        cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});

        auto result = std::move(work.front());
        work.pop_front();
        return result;
    }
    void doWork()
    {
        while(!finished)
        {
            WorkJob job = getWorkJob();
            if (!finished)
            {
                job();
            }
        }
    }

    public:
        void startJob(WorkJob&& item)
        {
            std::unique_lock<std::mutex>     lock(safe);
            work.push_back(std::move(item));
            cond.notify_one();
        }

        ThreadQueue(int count)
            : threads(count)
            , finished(false)
        {
            for(int loop = 0;loop < count; ++loop)
            {
                threads[loop] = std::thread(&ThreadQueue::doWork, this);
            }
        }
        ~ThreadQueue()
        {
            {
                std::unique_lock<std::mutex>     lock(safe);
                finished = true;
            }
            cond.notify_all();
        }
};

Async

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    FutureQueue          future(finished);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
    }
}

Auxiliary class to tidy up the future.

class FutureQueue
{
    using MyFuture   = std::future<void>;
    using FutureList = std::list<MyFuture>;

    int&                        finished;
    FutureList                  futures;
    std::mutex                  mutex;
    std::condition_variable     cond;
    std::thread                 cleaner;

    void waiter()
    {
        while(finished)
        {
            std::future<void>   next;
            {
                std::unique_lock<std::mutex> lock(mutex);
                cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
                if (futures.empty() && !finished)
                {
                    next = std::move(futures.front());
                    futures.pop_front();
                }
            }
            if (!next.valid())
            {
                next.wait();
            }
        }

    }
    public:
        FutureQueue(int& finished)
            : finished(finished)
            , cleaner(&FutureQueue::waiter, this)
        {}
        ~FutureQueue()
        {
            cleaner.join();
        }

        template<typename T>
        void addFuture(T&& lambda)
        {
            std::unique_lock<std::mutex> lock(mutex);
            futures.push_back(std::async(std::launch::async, std::move(lambda)));
            cond.notify_one();
        }
};

Upvotes: 21

Views: 1686

Answers (3)

Dan R
Dan R

Reputation: 1494

This application is certainly going to be I/O-bound and not CPU-bound, meaning that the vast majority of the time spent processing any single request is spent waiting in blocking I/O operations, not actually doing computation.

So having more threads (up to a point, but likely past 256 of them) is going to be faster, because it allows more concurrent I/O on different sockets as they all swap off the CPUs.

In other words, it's not the 8 cores that's the bottleneck, it's the socket communication. So you want to parallelize that as much as possible (or use non-blocking I/O).

Upvotes: 1

Damian
Damian

Reputation: 4631

If you want to scale linearly across more than about 8 cores you need to look at lock-free algorithms. This means implement compare-and-swap methods for the data you need to share between the threads.

There is a great book by Anthony Williams, C++ Concurrency in Action: Practical Multithreading that will show you some of the ideas.

Upvotes: 0

Nicol Bolas
Nicol Bolas

Reputation: 473192

What matters is not the cost of creating a thread vs. the cost of the work. What matters is the cost of your thread queue and work distribution relative to the total cost of thread creation and command execution.

In particular, I find myself disconcerted by this in your thread queue dispatch logic:

    std::unique_lock<std::mutex>     lock(safe);
    cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});

Every job's execution will be preceded by this lock. This makes getting something out of your thread queue expensive. Furthermore, a similar lock is used when adding a job. And there's only one thread which adds jobs; if it blocks, then some queue may not get a next job.

Indeed, if any of these block, somebody's probably going to be left in the cold. That's probably why adding more threads to the queue helped; it decreased the chance of someone having to block.

I don't know the deep details of asynchronous programming, but I think lock-free queues would be better for performance.

Upvotes: 0

Related Questions