Reputation: 11
My application requires to spawn more than one worker thread based on request. Also it should provide an option to cancel long processing thread based on based on request (with Id).
Using std::atomic_bool, with interruption points, all the worker threads are cancelled, but not able to cancel requested one. How to achieve cancellation of specific worker thread rather than all.
void doSomething(const int x, const int y, const int count, std::atomic_bool& isCancel)
{
int myValue = 0;
for (int itr=0; (itr < count) && !isCancel; itr++)
{
this_thread::sleep_for(chrono::milliseconds(100));
//std::this_thread::sleep_for(std::chrono::seconds(4));
myValue = x + y + itr;
std::cout << "MyValue: " << myValue << std::endl;
}
if (isCancel)
{
std::cout << "cancel updated: exit with cancel exit " << std::endl;
}
}
int main()
{
std::atomic_bool cancel_token = ATOMIC_VAR_INIT(false);
std::vector<std::future<void>> tasklist;
std::vector<std::tuple<int, std::atomic_bool, std::future<void>>> mTaskList;
for (int i = 0; i < 3; i++)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
tasklist.push_back(std::async(std::launch::async, doSomething, 1, 2, 250, std::ref(cancel_token)));
}
std::this_thread::sleep_for(std::chrono::seconds(4));
cancel_token = true;
for (int i = 0; i < tasklist.size(); i++)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
tasklist[i].get();
}
return 0;
}
I'm trying to cancel specific worker thread form list of spawned threads using std::async.
Expectation, my code should able to cancel an specific worker thread rather than all. Thank you
Upvotes: 0
Views: 70
Reputation: 195
You can do something like this. With this approach, you're not just able to cancel the task, but you can notify it do some work. Moreover, it's a bit faster than mutex + conditional variable.
#include <iostream>
#include <thread>
#include <chrono>
#include <memory>
#include <string>
using namespace std::chrono_literals;
class cancellable_task
{
public:
virtual ~cancellable_task() = default;
bool is_running() const
{
return _state->flag & 0b01;
}
void cancel()
{
_state->flag = 0b00;
_state->flag.notify_one();
}
void notify()
{
_state->flag |= 0b10;
_state->flag.notify_one();
}
void wait()
{
_state->flag.wait(0b01);
_state->flag &= 0b01;
}
virtual void operator()() = 0;
protected:
cancellable_task():
_state{new shared_state()}
{}
private:
struct shared_state
{
std::atomic<unsigned short> flag{0b01};
};
std::shared_ptr<shared_state> _state;
};
struct task: public cancellable_task
{
int counter{0};
void operator()() override
{
while (is_running())
{
std::cout << "iteration: " << counter++ << std::endl;
wait();
}
std::cout << "cancelled" << std::endl;
}
};
int main()
{
task my_task;
std::thread t{my_task};
std::this_thread::sleep_for(1s);
my_task.notify();
std::this_thread::sleep_for(1s);
my_task.notify();
std::this_thread::sleep_for(1s);
my_task.cancel();
t.join();
return 0;
}
Output:
iteration: 0
iteration: 1
iteration: 2
cancelled
Upvotes: 0