Bubblycentaur
Bubblycentaur

Reputation: 11

How to cancel a particular worker thread spawned using std::async based on request

My application requires to spawn more than one worker thread based on request. Also it should provide an option to cancel long processing thread based on based on request (with Id).

Using std::atomic_bool, with interruption points, all the worker threads are cancelled, but not able to cancel requested one. How to achieve cancellation of specific worker thread rather than all.

void doSomething(const int x, const int y, const int count, std::atomic_bool& isCancel)
{
    int myValue = 0;
    for (int itr=0; (itr < count) && !isCancel; itr++)
    {
        this_thread::sleep_for(chrono::milliseconds(100));
        //std::this_thread::sleep_for(std::chrono::seconds(4));
        myValue = x + y + itr;
        std::cout << "MyValue: " << myValue << std::endl;
    }

    if (isCancel)
    {
        std::cout << "cancel updated: exit with cancel exit " << std::endl;
    }
}

int main()
{
    std::atomic_bool cancel_token = ATOMIC_VAR_INIT(false);
    std::vector<std::future<void>> tasklist;
    std::vector<std::tuple<int, std::atomic_bool, std::future<void>>> mTaskList;

    for (int i = 0; i < 3; i++)
    {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        tasklist.push_back(std::async(std::launch::async, doSomething, 1, 2, 250, std::ref(cancel_token)));
    }

    std::this_thread::sleep_for(std::chrono::seconds(4));
    cancel_token = true;

    for (int i = 0; i < tasklist.size(); i++)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        tasklist[i].get();
    }

    return 0;
}

I'm trying to cancel specific worker thread form list of spawned threads using std::async.

Expectation, my code should able to cancel an specific worker thread rather than all. Thank you

Upvotes: 0

Views: 70

Answers (1)

bbalazs
bbalazs

Reputation: 195

You can do something like this. With this approach, you're not just able to cancel the task, but you can notify it do some work. Moreover, it's a bit faster than mutex + conditional variable.

#include <iostream>
#include <thread>
#include <chrono>
#include <memory>
#include <string>

using namespace std::chrono_literals;

class cancellable_task
{
public:

    virtual ~cancellable_task() = default;

    bool is_running() const
    {
        return _state->flag & 0b01;
    }

    void cancel()
    {
        _state->flag = 0b00;
        _state->flag.notify_one();
    }

    void notify()
    {
        _state->flag |= 0b10;
        _state->flag.notify_one();
    }

    void wait()
    {
        _state->flag.wait(0b01);
        _state->flag &= 0b01;
    }
    
    virtual void operator()() = 0;

protected:

    cancellable_task():
        _state{new shared_state()}
    {}
    
private:

    struct shared_state
    {
        std::atomic<unsigned short> flag{0b01};
    };

    std::shared_ptr<shared_state> _state;

};


struct task: public cancellable_task
{
    int counter{0};
    
    void operator()() override
    {
        while (is_running())
        {
            std::cout << "iteration: " << counter++ << std::endl;
            wait();
        }
        std::cout << "cancelled" << std::endl;
    }
};
   
    
int main()
{
    task my_task;
    std::thread t{my_task};

    std::this_thread::sleep_for(1s);
    my_task.notify();
    std::this_thread::sleep_for(1s);
    my_task.notify();
    std::this_thread::sleep_for(1s);
    my_task.cancel();
    
    t.join();
    return 0;
}

Output:

iteration: 0
iteration: 1
iteration: 2
cancelled

Upvotes: 0

Related Questions