Ransom Erb
Ransom Erb

Reputation: 31

Broken promise, having trouble figuring it out (C++)

The error message I'm getting:

Unhandled exception at 0x7712A9F2 in eye_tracking.exe: Microsoft C++ exception: std::future_error at memory location 0x010FEA50.

Code snippet of where I fork and join:

//CONCURRENCE
std::vector<costGrad*> threadGrads;
std::vector<std::thread> threads;
std::vector<std::future<costGrad*>> ftr(maxThreads);

for (int i = 0; i < maxThreads; i++)    //Creating threads
{
    int start = floor(xValsB.rows() / (double)maxThreads * i);
    int end = floor(xValsB.rows() / (double)maxThreads * (i+1));
    int length = end-start;
    std::promise<costGrad*> prms;
    ftr[i] = prms.get_future();
    threads.push_back(std::thread([&]() 
    {
        costThread(std::move(prms), params, xValsB.block(start, 0, length, xValsB.cols()), yVals.block(start, 0, length, yVals.cols()), lambda, m); 
    }));
}

for (int i = 0; i < maxThreads; i++)    //Collecting future
    threadGrads.push_back(ftr[i].get()); <-------I THINK THIS IS WHERE I'M MESSING UP

for (int i = 0; i < maxThreads; i++)    //Joining threads
    threads[i].join();

Following is the costThread function:

void costThread(std::promise<costGrad*> && pmrs, 
                const std::vector<Eigen::MatrixXd>& params, 
                const Eigen::MatrixXd& xValsB, 
                const Eigen::MatrixXd& yVals, 
                const double lambda, 
                const int m) 
{
    try
    {
        costGrad* temp = new costGrad;      //"Cost / Gradient" struct to be returned at end

        temp->forw = 0;
        temp->back = 0;

        std::vector<Eigen::MatrixXd> matA;          //Contains the activation values including bias, first entry will be xVals
        std::vector<Eigen::MatrixXd> matAb;         //Contains the activation values excluding bias, first entry will be xVals
        std::vector<Eigen::MatrixXd> matZ;          //Contains the activation values prior to sigmoid
        std::vector<Eigen::MatrixXd> paramTrunc;    //Contains the parameters exluding bias terms

        clock_t t1, t2, t3;
        t1 = clock();

        //FORWARD PROPAGATION PREP

        Eigen::MatrixXd xVals = Eigen::MatrixXd::Constant(xValsB.rows(), xValsB.cols() + 1, 1); //Add bias units onto xVal
        xVals.block(0, 1, xValsB.rows(), xValsB.cols()) = xValsB;

        matA.push_back(xVals);
        matAb.push_back(xValsB);

        //FORWARD PROPAGATION

        for (int i = 0; i < params.size(); i++)
        {
            Eigen::MatrixXd paramTemp = params[i].block(0, 1, params[i].rows(), params[i].cols() - 1);      //Setting up paramTrunc

            paramTrunc.push_back(paramTemp);

            matZ.push_back(matA.back() * params[i].transpose());
            matAb.push_back(sigmoid(matZ.back()));


            Eigen::MatrixXd tempA = Eigen::MatrixXd::Constant(matAb.back().rows(), matAb.back().cols() + 1, 1); //Add bias units
            tempA.block(0, 1, matAb.back().rows(), matAb.back().cols()) = matAb.back();

            matA.push_back(tempA);
        }

        t2 = clock();

        //COST CALCULATION

        temp->J = (yVals.array()*(0 - log(matAb.back().array())) - (1 - yVals.array())*log(1 - matAb.back().array())).sum() / m;

        //BACK PROPAGATION

        std::vector<Eigen::MatrixXd> del;
        std::vector<Eigen::MatrixXd> grad;

        del.push_back(matAb.back() - yVals);

        for (int i = 0; i < params.size() - 1; i++)
        {
            del.push_back((del.back() * paramTrunc[paramTrunc.size() - 1 - i]).array() * sigmoidGrad(matZ[matZ.size() - 2 - i]).array());
        }
        for (int i = 0; i < params.size(); i++)
        {
            grad.push_back(del.back().transpose() * matA[i] / m);
            del.pop_back();
        }
        for (int i = 0; i < params.size(); i++)
        {
            int rws = grad[i].rows();
            int cls = grad[i].cols() - 1;
            Eigen::MatrixXd tmp = grad[i].block(0, 1, rws, cls);
            grad[i].block(0, 1, rws, cls) = tmp.array() + lambda / m*paramTrunc[i].array();
        }

        temp->grad = grad;

        t3 = clock();

        temp->forw = ((float)t2 - (float)t1) / 1000;
        temp->back = ((float)t3 - (float)t2) / 1000;

        pmrs.set_value(temp);
    }

    catch (...)
    {
        pmrs.set_exception(std::current_exception());
    }
    //return temp;
}

EDIT:

Figured out the exception is a broken promise. I'm still having problems understanding what I'm getting wrong here. At the end of costThread() I use

pmrs.set_value(temp);

And I expect the following to get temp:

for (int i = 0; i < maxThreads; i++)    //Collecting future
    threadGrads.push_back(ftr[i].get());

But somehow I'm getting it all wrong.

Upvotes: 3

Views: 7807

Answers (1)

Dmitry
Dmitry

Reputation: 3107

You have a race condition: you are passing a local variable to a thread by reference and moving it inside the thread; it will work only if a new thread manages to execute the move statement before local variable gets destructed due to going out of the scope. Normally, given the code, the destructor would be faster.

If you can use C++14, you can move the promise in the lambda initializer:

threads.push_back(
    std::thread([prms=std::move(prms)]() {
        costThread(prms, /* etc */);
    })
);

If you're limited to C++11, wrap the promise into a std::shared_ptr and pass it by value.

I would additionally handle exceptions in the worker threads and pass them to the processing thread through std::promise::set_exception(), though it's a matter of preference.

Upvotes: 5

Related Questions