Uzer
Uzer

Reputation: 3190

Async/future argument corruption c++

So I am working on async c++ code, and wish to find a pattern which will work well for me todo with partitioning a set of tasks, having each thread perform some fraction of the task.

The general idea of the code below is to create some dummy tasks, as a vector. Call distributeTasksVec which creates a vector of vector of task id's, which are evenly distributed umongst async. Then the program waits for the future results using get.

The output of the program prints "params" and then prints the contents of list of tasks ids for each partition. In this case i have left the number of partitions set to 1. It does this in 3 locations, first just before the definition of the async future, once in the closure of the future and once within the kernelTester function.

Interestingly the console output i observe seems to have some corruption. I get output looking like the following:

param1 (as expected!) 0, 1, 2 ...... 9999,

params2 10000 (the below is not expected) -1996084928,21854,-1707205664,32734,0,0,0,0,8,9,10,11,12,13, .... 9999

params3 (the below is not expected) -1996084928,21854,-1707205664,32734,0,0,0,0,8,9,10 .... 9999

What on earth is happening to the vector here as it is being copied to the thread? They consistantly get corrupted it seems.

#include <boost/filesystem.hpp>
#include <boost/algorithm/string.hpp>
#include <thread>
#include <random>
#include <iterator>
#include <algorithm>
#include <chrono>
#include <time.h>
using namespace std;

const auto processor_count = std::thread::hardware_concurrency();

vector<int> getRange(int range)
{
    vector<int> indices;
    for (int i = 0; i < range; i++)
    {
        indices.push_back(i);
    }
    return indices;
}

vector<vector<int>> distributeTasksVec(vector<int> &tasks, int partitions)
{
    vector<vector<int>> taskDistibution = {};
    for (int i = 0; i < partitions; i++)
    {
        taskDistibution.push_back({});
    }
    int currentPartition = 0;
    for (int i = 0; i < tasks.size(); i++)
    {
        taskDistibution[currentPartition].push_back(i);

        currentPartition++;
        if (currentPartition % partitions == 0)
        {
            currentPartition = 0;
        }
    }
    return taskDistibution;
}

void centroidPrinter(vector<int> centroids)
{
    for (int i = 0; i < centroids.size(); i++)
    {
        std::cout << centroids[i] << ",";
    }
    std::cout << endl;
}

vector<pair<int, int>> kernelTester(vector<int> range)
{
    std::cout << "range " << range.size() << endl;
    std::cout << "params3 " << endl;
    centroidPrinter(range);
    return {};
}


void concurrentTester(vector<int> *_dataIndices, int processor_count)
{
    auto dataIndices = *_dataIndices;
    const int dataRowCount = dataIndices.size(); // data.rows;
    const int threadPool = processor_count;

    auto range = getRange(dataRowCount);
    auto distributedTasks = distributeTasksVec(range, threadPool);

    vector<std::future<std::vector<std::pair<int, int>>>> futures = {};
    for (int i = 0; i < threadPool; i++)
    {
        vector<int> thisTask = distributedTasks[i];

        std::cout << "params1 " << endl;
        centroidPrinter(thisTask);

        auto future = std::async(std::launch::async, [&thisTask]()
                             {
                                     std::cout << "params2 " << thisTask.size() << endl;

                                     centroidPrinter(thisTask);
                                     return kernelTester(std::move(thisTask));
                                 });
        futures.emplace_back(std::move(future));
        std::cout << "building thread " << i << " done" << endl;
    }

    std::cout << " about to get results " << endl;

    for (int i = 0; i < futures.size(); i++)
    {
        std::cout << "here 0" << endl;
        std::cout << futures[i].valid() << endl;
        auto data = futures[i].get();
        std::cout << "here 1" << endl;
        std::cout << "here END OF FUTURE" << endl;
    }
}

int main(int argc, char **argv)
{
    auto range = getRange(10000);
    concurrentTester(&range, 1);
}

Upvotes: 0

Views: 100

Answers (1)

NathanOliver
NathanOliver

Reputation: 180435

The vector thisTask is local to the loop for (int i = 0; i < threadPool; i++). You capture that vector by reference by the lambda expression that you use to for the thread. That vector is destroyed at the end of every iteration of the loop, leaving your thread with a reference to an invalid object. Using said object is undefined behavior.

What you can do is move thistask into the lambda like

[thistask = std::move(thistask)]() { ...}

and now the closure object owns the actual vector instead of just having a reference to it.

Upvotes: 2

Related Questions