Reputation: 31
The error message I'm getting:
Unhandled exception at 0x7712A9F2 in eye_tracking.exe: Microsoft C++ exception: std::future_error at memory location 0x010FEA50.
Code snippet of where I fork and join:
//CONCURRENCE
std::vector<costGrad*> threadGrads;
std::vector<std::thread> threads;
std::vector<std::future<costGrad*>> ftr(maxThreads);
for (int i = 0; i < maxThreads; i++) //Creating threads
{
int start = floor(xValsB.rows() / (double)maxThreads * i);
int end = floor(xValsB.rows() / (double)maxThreads * (i+1));
int length = end-start;
std::promise<costGrad*> prms;
ftr[i] = prms.get_future();
threads.push_back(std::thread([&]()
{
costThread(std::move(prms), params, xValsB.block(start, 0, length, xValsB.cols()), yVals.block(start, 0, length, yVals.cols()), lambda, m);
}));
}
for (int i = 0; i < maxThreads; i++) //Collecting future
threadGrads.push_back(ftr[i].get()); <-------I THINK THIS IS WHERE I'M MESSING UP
for (int i = 0; i < maxThreads; i++) //Joining threads
threads[i].join();
Following is the costThread
function:
void costThread(std::promise<costGrad*> && pmrs,
const std::vector<Eigen::MatrixXd>& params,
const Eigen::MatrixXd& xValsB,
const Eigen::MatrixXd& yVals,
const double lambda,
const int m)
{
try
{
costGrad* temp = new costGrad; //"Cost / Gradient" struct to be returned at end
temp->forw = 0;
temp->back = 0;
std::vector<Eigen::MatrixXd> matA; //Contains the activation values including bias, first entry will be xVals
std::vector<Eigen::MatrixXd> matAb; //Contains the activation values excluding bias, first entry will be xVals
std::vector<Eigen::MatrixXd> matZ; //Contains the activation values prior to sigmoid
std::vector<Eigen::MatrixXd> paramTrunc; //Contains the parameters exluding bias terms
clock_t t1, t2, t3;
t1 = clock();
//FORWARD PROPAGATION PREP
Eigen::MatrixXd xVals = Eigen::MatrixXd::Constant(xValsB.rows(), xValsB.cols() + 1, 1); //Add bias units onto xVal
xVals.block(0, 1, xValsB.rows(), xValsB.cols()) = xValsB;
matA.push_back(xVals);
matAb.push_back(xValsB);
//FORWARD PROPAGATION
for (int i = 0; i < params.size(); i++)
{
Eigen::MatrixXd paramTemp = params[i].block(0, 1, params[i].rows(), params[i].cols() - 1); //Setting up paramTrunc
paramTrunc.push_back(paramTemp);
matZ.push_back(matA.back() * params[i].transpose());
matAb.push_back(sigmoid(matZ.back()));
Eigen::MatrixXd tempA = Eigen::MatrixXd::Constant(matAb.back().rows(), matAb.back().cols() + 1, 1); //Add bias units
tempA.block(0, 1, matAb.back().rows(), matAb.back().cols()) = matAb.back();
matA.push_back(tempA);
}
t2 = clock();
//COST CALCULATION
temp->J = (yVals.array()*(0 - log(matAb.back().array())) - (1 - yVals.array())*log(1 - matAb.back().array())).sum() / m;
//BACK PROPAGATION
std::vector<Eigen::MatrixXd> del;
std::vector<Eigen::MatrixXd> grad;
del.push_back(matAb.back() - yVals);
for (int i = 0; i < params.size() - 1; i++)
{
del.push_back((del.back() * paramTrunc[paramTrunc.size() - 1 - i]).array() * sigmoidGrad(matZ[matZ.size() - 2 - i]).array());
}
for (int i = 0; i < params.size(); i++)
{
grad.push_back(del.back().transpose() * matA[i] / m);
del.pop_back();
}
for (int i = 0; i < params.size(); i++)
{
int rws = grad[i].rows();
int cls = grad[i].cols() - 1;
Eigen::MatrixXd tmp = grad[i].block(0, 1, rws, cls);
grad[i].block(0, 1, rws, cls) = tmp.array() + lambda / m*paramTrunc[i].array();
}
temp->grad = grad;
t3 = clock();
temp->forw = ((float)t2 - (float)t1) / 1000;
temp->back = ((float)t3 - (float)t2) / 1000;
pmrs.set_value(temp);
}
catch (...)
{
pmrs.set_exception(std::current_exception());
}
//return temp;
}
Figured out the exception is a broken promise. I'm still having problems understanding what I'm getting wrong here. At the end of costThread()
I use
pmrs.set_value(temp);
And I expect the following to get temp
:
for (int i = 0; i < maxThreads; i++) //Collecting future
threadGrads.push_back(ftr[i].get());
But somehow I'm getting it all wrong.
Upvotes: 3
Views: 7807
Reputation: 3107
You have a race condition: you are passing a local variable to a thread by reference and moving it inside the thread; it will work only if a new thread manages to execute the move statement before local variable gets destructed due to going out of the scope. Normally, given the code, the destructor would be faster.
If you can use C++14, you can move the promise in the lambda initializer:
threads.push_back(
std::thread([prms=std::move(prms)]() {
costThread(prms, /* etc */);
})
);
If you're limited to C++11, wrap the promise into a std::shared_ptr
and pass it by value.
I would additionally handle exceptions in the worker threads and pass them to the processing thread through std::promise::set_exception()
, though it's a matter of preference.
Upvotes: 5