Reputation: 5243
void VideoRender::execute(){
for(int i = 0; i < 1000; i++)
udpateData(myPath, myIndex);
}
void VideoRender::updateData(const std::string &i_obj_path, const uint i_subIndex)
{
std::shared_ptr<FrameManager> container = std::make_shared<FrameManager>(m_nativeCodec);
std::thread th1(&VideoRender::loadGeometry, this, i_obj_path.c_str(), i_subIndex, container);
std::thread th2(&VideoRender::loadTextures, this, container);
th1.join();
th2.join();
m_fifo.enqueue(container);
}
Problem here that every time when updateData
method invoked 2 new threads are created.
So, I decided to change this logic to something that use std::condition_variable
What I have done
1) Created ConditionEvent
class
h.file
namespace hello_ar
{
class ConditionEvent
{
public:
ConditionEvent() = default;
~ConditionEvent() = default;
void wait();
void notify();
private:
mutable std::mutex m_mutex;
std::condition_variable m_condition;
};
}
cc file
#include "ConditionEvent.h"
#include <android/log.h>
namespace hello_ar
{
void ConditionEvent::wait()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock);
}
void ConditionEvent::notify()
{
std::lock_guard<std::mutex> lock(m_mutex);
m_condition.notify_all();
}
}
Then I created a UploadLooper
class
h file
namespace hello_ar
{
class UploadLooper
{
public:
UploadLooper() = default;
~UploadLooper();
void start();
void enqueueLoadTextureTask(const std::shared_ptr<FrameManager> &container, std::shared_ptr<ConditionEvent> condition);
void enqueueLoadGeometryTask(const std::shared_ptr<FrameManager> &container, char const *filename, const uint subIndex,
std::shared_ptr<ConditionEvent> condition);
void quit();
private:
enum class Task
{
LoadGeometry, LoadTexture, ERROR
};
struct LooperMessage
{
std::shared_ptr<FrameManager> m_container;
std::shared_ptr<ConditionEvent> m_condition;
char const *m_filename;
uint m_subIndex;
Task m_task;
//Load Geometry
LooperMessage(std::shared_ptr<FrameManager> container, //
std::shared_ptr<ConditionEvent> condition,//
char const *filename = "", //
const uint subIndex = static_cast<const uint>(-1),//
Task task = Task::ERROR //
) : m_container(container), m_condition(condition),
m_filename(filename), m_subIndex(subIndex), m_task(task)
{
}
//Load Textures
LooperMessage(std::shared_ptr<FrameManager> container, //
std::shared_ptr<ConditionEvent> condition,//
Task task = Task::ERROR//
) : LooperMessage(container, //
condition,
"", //
static_cast<uint>(-1), //
task//
)
{
}
};
safe_queue<std::shared_ptr<LooperMessage>> m_fifo;
std::thread m_worker;
void loop();
void trampoline(void *p);
void releaseWorker();
};
}
cc file
namespace hello_ar
{
UploadLooper::~UploadLooper()
{
quit();
}
void UploadLooper::releaseWorker()
{
if (m_worker.joinable())
m_worker.join();
}
void UploadLooper::trampoline(void *p)
{
((UploadLooper *) p)->loop();
}
void UploadLooper::loop()
{
while (true)
{
if (m_fifo.empty())
continue;
std::shared_ptr<LooperMessage> msg = m_fifo.dequeue();
if (!msg)
{
return;
}
switch (msg->m_task)
{
case Task::LoadGeometry:
{
msg->m_container->LoadFrameData(msg->m_filename, msg->m_subIndex);
msg->m_condition->notify();
}
break;
case Task::LoadTexture:
{
msg->m_container->LoadImage();
msg->m_condition->notify();
}
break;
case Task::ERROR:
break;
}
std::this_thread::yield();
}
}
void UploadLooper::enqueueLoadTextureTask(const std::shared_ptr<FrameManager> &container, std::shared_ptr<ConditionEvent> condition)
{
std::shared_ptr<LooperMessage> msg = std::make_shared<LooperMessage>(container, condition, Task::LoadTexture);
m_fifo.enqueue(msg);
}
void UploadLooper::enqueueLoadGeometryTask(const std::shared_ptr<FrameManager> &container, //
char const *filename, const uint subIndex, //
std::shared_ptr<ConditionEvent> condition)
{
std::shared_ptr<LooperMessage> msg = std::make_shared<LooperMessage>(container, condition, filename, subIndex, Task::LoadGeometry);
m_fifo.enqueue(msg);
}
void UploadLooper::quit()
{
m_fifo.enqueue(nullptr);
releaseWorker();
}
void UploadLooper::start()
{
if (!m_worker.joinable())
{
std::thread t(&UploadLooper::trampoline, this, this);
m_worker = std::move(t);
}
}
}
Eventually my begging implementation looks like this
void VideoRender::execute(){
for(int i = 0; i < 1000; i++)
udpateData(myPath, myIndex);
}
void VideoRender::updateData(const std::string &i_obj_path, const uint i_subIndex)
{
std::shared_ptr<FrameManager> container = std::make_shared<FrameManager>(m_nativeCodec);
std::shared_ptr<ConditionEvent> texCond = std::make_shared<ConditionEvent>();
std::shared_ptr<ConditionEvent> geoCond = std::make_shared<ConditionEvent>();
m_texLopper.enqueueLoadTextureTask(container, texCond);
m_geometryLopper.enqueueLoadGeometryTask(container, i_obj_path.c_str(), i_subIndex, geoCond);
texCond->wait();
geoCond->wait();
m_fifo.enqueue(container);
}
But after debugging I found out that after first time I invoke updateData
method I came to m_texLopper
call notify
, then I came to m_geometryLooper
call notify
and just after this I came to texCond->wait()
... Despite of loopers runs in separate threads...
What am I doing wrong?
EDIT
Problem is - it is impossible that notify
is invoked before wait
. Because of according to the implementation I push task to looper (task execution time is 30 milliseconds) and next line is wait
. So, I push the task to separate thread -> next line wait
-> after 30 milliseconds notify
... But it works like push task -> after 30 milliseconds notify -> wait
... How is it possible?
Upvotes: 0
Views: 1417
Reputation: 2849
All you need to do is add variable to ConditionEvent
, i.e.
bool notified = false;
Then use this variable:
void ConditionEvent::wait()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [this]() {return notified;});
}
void ConditionEvent::notify()
{
std::lock_guard<std::mutex> lock(m_mutex);
notified = true;
m_condition.notify_all();
}
Edit: Fixed lambda.
Upvotes: 2
Reputation: 10155
It is not impossible for notify()
to be invoked before wait()
. When you are using multiple threads, their execution can start and stop at any time. Your task executes very quickly, so it is reasonable that the first thread might not continue execution before the second thread has finished.
You are expecting this:
Thread 1 Thread 2
enqueue
wait dequeue
LoadFrameData()
notify
But this is also possible:
Thread 1 Thread 2
enqueue
dequeue
LoadFrameData()
notify
wait
You should add a condition to check whenever your condition variable is notified. The whole code can be simplified like this:
class ConditionEvent {
public:
void ConditionEvent::wait() {
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [&]() {return notified;});
}
void ConditionEvent::set() {
std::lock_guard<std::mutex> lock(m_mutex);
notified = true;
m_condition.notify_all();
}
void reset() {
notified = false;
}
private:
mutable std::mutex m_mutex;
bool notified = false;
std::condition_variable m_condition;
};
void VideoRender::execute() {
std::shared_ptr<FrameManager> container;
ConditionEvent geometry, textures;
// If this thread obtains the lock, initialize the container
auto init = [&]() {
std::lock_guard<std::mutex> lock(containerMutex);
if (!container) {
container = std::make_shared<FrameManager>(m_nativeCodec);
}
};
// If this thread obtains the lock, enqueue the container
auto enqueue = [&]() {
std::lock_guard<std::mutex> lock(containerMutex);
if (container) {
m_fifo.enqueue(container);
container.reset();
geometry.reset();
textures.reset();
}
};
std::thread t1([]() {
for (int i = 0; i < 1000; i++) {
init();
loadGeometry();
geometry.set();
textures.wait();
enqueue();
}
});
std::thread t2([]() {
for (int i = 0; i < 1000; i++) {
init();
loadTextures();
textures.set();
geometry.wait();
enqueue();
}
});
t1.join();
t2.join();
}
Upvotes: 2