Vladimir Bershov
Vladimir Bershov

Reputation: 2832

std::async analogue for specified thread

I need to work with several objects, where each operation may take a lot of time.

The processing could not be placed in a GUI (main) thread, where I start it.

I need to make all the communications with some objects on asynchronous operations, something similar to std::async with std::future or QtConcurrent::run() in my main framework (Qt 5), with QFuture, etc., but it doesn't provide thread selection. I need to work with a selected object (objects == devices) in only one additional thread always,

because:

  1. I need to make a universal solution and don't want to make each class thread-safe
  2. For example, even if make a thread-safe container for QSerialPort, Serial port in Qt cannot be accessed in more than one thread:

Note: The serial port is always opened with exclusive access (that is, no other process or thread can access an already opened serial port).

  1. Usually a communication with a device consists of transmit a command and receive an answer. I want to process each Answer exactly in the place where Request was sent and don't want to use event-driven-only logic.

So, my question.

How can the function be implemented?

MyFuture<T> fut = myAsyncStart(func, &specificLiveThread);

It is necessary that one live thread can be passed many times.

Upvotes: 2

Views: 1284

Answers (3)

Mike van Dyke
Mike van Dyke

Reputation: 2868

If you want to follow the Active Object approach here is an example using templates:

The WorkPackage and it's interface are just for storing functions of different return type in a vector (see later in the ActiveObject::async member function):

class IWorkPackage {
    public:
        virtual void execute() = 0;

        virtual ~IWorkPackage() {

        }
};

template <typename R>
class WorkPackage : public IWorkPackage{
    private:
        std::packaged_task<R()> task;
    public:
        WorkPackage(std::packaged_task<R()> t) : task(std::move(t)) {

        }

        void execute() final {
            task();
        }

        std::future<R> get_future() {
            return task.get_future();
        }
};

Here's the ActiveObject class which expects your devices as a template. Furthermore it has a vector to store the method requests of the device and a thread to execute those methods one after another. Finally the async function is used to request a method call from the device:

template <typename Device>
class ActiveObject {
    private:
        Device servant;
        std::thread worker;
        std::vector<std::unique_ptr<IWorkPackage>> work_queue;
        std::atomic<bool> done;
        std::mutex queue_mutex;
        std::condition_variable cv;
        void worker_thread() {
            while(done.load() == false) {
                std::unique_ptr<IWorkPackage> wp;
                {
                    std::unique_lock<std::mutex> lck {queue_mutex};

                    cv.wait(lck, [this] {return !work_queue.empty() || done.load() == true;});
                    if(done.load() == true) continue;

                    wp = std::move(work_queue.back());
                    work_queue.pop_back();
                }

                if(wp) wp->execute();
            }
        }
    public:

        ActiveObject(): done(false) {
            worker = std::thread {&ActiveObject::worker_thread, this};
        }

        ~ActiveObject() {
            {
                std::unique_lock<std::mutex> lck{queue_mutex};
                done.store(true);
            }
            cv.notify_one();
            worker.join();
        }

        template<typename R, typename ...Args, typename ...Params>
        std::future<R> async(R (Device::*function)(Params...), Args... args) {
            std::unique_ptr<WorkPackage<R>> wp {new WorkPackage<R> {std::packaged_task<R()> { std::bind(function, &servant, args...) }}};
            std::future<R> fut = wp->get_future();
            {
                std::unique_lock<std::mutex> lck{queue_mutex};
                work_queue.push_back(std::move(wp));
            }
            cv.notify_one();

            return fut;
        }

        // In case you want to call some functions directly on the device
        Device* operator->() {
            return &servant;
        }

};

You can use it as follows:

ActiveObject<QSerialPort> ao_serial_port;
// direct call:
ao_serial_port->setReadBufferSize(size);
//async call:
std::future<void> buf_future = ao_serial_port.async(&QSerialPort::setReadBufferSize, size);

std::future<Parity> parity_future = ao_serial_port.async(&QSerialPort::parity);

// Maybe do some other work here

buf_future.get(); // wait until calculations are ready
Parity p = parity_future.get(); // blocks if result not ready yet, i.e. if method has not finished execution yet

EDIT to answer the question in the comments: The AO is mainly a concurrency pattern for multiple reader/writer. As always, its use depends on the situation. And so this pattern is commonly used in distributed systems/network applications, for example when multiple clients request a service from a server. The clients benefit from the AO pattern as they are not blocked, when waiting for the server to answer. One reason why this pattern is not used so often in fields other then network apps might be the thread overhead. When creating a thread for every active object results in a lot of threads and thus thread contention if the number of CPUs is low and many active objects are used at once.
I can only guess why people think it is a strange issue: As you already found out it does require some additional programming. Maybe that's the reason but I'm not sure.
But I think the pattern is also very useful for other reasons and uses. As for your example, where the main thread (and also other background threads) require a service from singletons, for example some devices or hardware interfaces, which are only availabale in a low number, slow in their computations and require concurrent access, without being blocked waiting for a result.

Upvotes: 4

Rames
Rames

Reputation: 938

Let me answer without referencing to Qt library since I don't know its threading API.

In C++11 standard library there is no straightforward way to reuse created thread. Thread executes single function and can be only joined or detachted. However, you can implement it with producer-consumer pattern. The consumer thread needs to execute tasks (represented as std::function objects for instance) which are placed in queue by producer thread. So if I am correct you need a single threaded thread pool.

I can recommend my C++14 implementation of thread pools as tasks queues. It isn't commonly used (yet!) but it is covered with unit tests and checked with thread sanitizer multiple times. The documentation is sparse but feel free to ask anything in github issues!

Library repository: https://github.com/Ravirael/concurrentpp

And your use case:

#include <task_queues.hpp>

int main() {
    // The single threaded task queue object - creates one additional thread.
    concurrent::n_threaded_fifo_task_queue queue(1);

    // Add tasks to queue, task is executed in created thread.
    std::future<int> future_result = queue.push_with_result([] { return 4; });

    // Blocks until task is completed.
    int result = future_result.get();

    // Executes task on the same thread as before.
    std::future<int> second_future_result = queue.push_with_result([] { return 4; });
}

Upvotes: 7

MSalters
MSalters

Reputation: 179809

It's Qt. It's signal-slot mechanism is thread-aware. On your secondary (non-GUI) thread, create a QObject-derived class with an execute slot. Signals connected to this slot will marshal the event to that thread.

Note that this QObject can't be a child of a GUI object, since children need to live in their parents thread, and this object explicitly does not live in the GUI thread.

You can handle the result using existing std::promise logic, just like std::future does.

Upvotes: 3

Related Questions