Reputation: 16207
Things seem to be working but I'm unsure if this is the best way to go about it.
Basically I have an object which does asynchronous retrieval of data. This object has a vector of pointers which are allocated and de-allocated on the main thread. Using boost functions a process results callback is bound with one of the pointers in this vector. When it fires it will be running on some arbitrary thread and modify the data of the pointer.
Now I have critical sections around the parts that are pushing into the vector and erasing in case the asynch retrieval object is receives more requests but I'm wondering if I need some kind of guard in the callback that is modifying the pointer data as well.
Hopefully this slimmed down pseudo code makes things more clear:
class CAsyncRetriever
{
// typedefs of boost functions
class DataObject
{
// methods and members
};
public:
// Start single asynch retrieve with completion callback
void Start(SomeArgs)
{
SetupRetrieve(SomeArgs);
LaunchRetrieves();
}
protected:
void SetupRetrieve(SomeArgs)
{
// ...
{ // scope for data lock
boost::lock_guard<boost::mutex> lock(m_dataMutex);
m_inProgress.push_back(SmartPtr<DataObject>(new DataObject)));
m_callback = boost::bind(&CAsyncRetriever::ProcessResults, this, _1, m_inProgress.back());
}
// ...
}
void ProcessResults(DataObject* data)
{
// CALLED ON ANOTHER THREAD ... IS THIS SAFE?
data->m_SomeMember.SomeMethod();
data->m_SomeOtherMember = SomeStuff;
}
void Cleanup()
{
// ...
{ // scope for data lock
boost::lock_guard<boost::mutex> lock(m_dataMutex);
while(!m_inProgress.empty() && m_inProgress.front()->IsComplete())
m_inProgress.erase(m_inProgress.begin());
}
// ...
}
private:
std::vector<SmartPtr<DataObject>> m_inProgress;
boost::mutex m_dataMutex;
// other members
};
Edit: This is the actual code for the ProccessResults callback (plus comments for your benefit)
void ProcessResults(CRetrieveResults* pRetrieveResults, CRetData* data)
{
// pRetrieveResults is delayed binding that server passes in when invoking callback in thread pool
// data is raw pointer to ref counted object in vector of main thread (the DataObject* in question)
// if there was an error set the code on the atomic int in object
data->m_nErrorCode.Store_Release(pRetrieveResults->GetErrorCode());
// generic iterator of results bindings for generic sotrage class item
TPackedDataIterator<GenItem::CBind> dataItr(&pRetrieveResults->m_DataIter);
// namespace function which will iterate results and initialize generic storage
GenericStorage::InitializeItems<GenItem>(&data->m_items, dataItr, pRetrieveResults->m_nTotalResultsFound); // this is potentially time consuming depending on the amount of results and amount of columns that were bound in storage class definition (i.e.about 8 seconds for a million equipment items in release)
// atomic uint32_t that is incremented when kicking off async retrieve
m_nStarted.Decrement(); // this one is done processing
// boost function completion callback bound to interface that requested results
data->m_complete(data->m_items);
}
Upvotes: 3
Views: 562
Reputation: 884
No, it isn't safe.
ProcessResults
operates on the data structure passed to it through DataObject
. It indicates that you have shared state between different threads, and if both threads operate on the data structure concurrently you might have some trouble coming your way.
Upvotes: 2
Reputation: 54178
As it stands, it appears that the Cleanup
code can destroy an object for which a callback to ProcessResults
is in flight. That's going to cause problems when you deref the pointer in the callback.
My suggestion would be that you extend the semantics of your m_dataMutex
to encompass the callback, though if the callback is long-running, or can happen inline within SetupRetrieve
(sometimes this does happen - though here you state the callback is on a different thread, in which case you are OK) then things are more complex. Currently m_dataMutex
is a bit confused about whether it controls access to the vector, or its contents, or both. With its scope clarified, ProcessResults
could then be enhanced to verify validity of the payload within the lock.
Upvotes: 3
Reputation: 308452
Updating a pointer should be an atomic operation, but you can use InterlockedExchangePointer
(in Windows) to be sure. Not sure what the Linux equivalent would be.
The only consideration then would be if one thread is using an obsolete pointer. Does the other thread delete the object pointed to by the original pointer? If so, you have a definite problem.
Upvotes: 0