Reputation: 11
Consider the following simplified program with a Producer
and a Consumer
. The latter is notified whenever an int
is available from the former (in this simple case, every second). The notification is done by pushing the value into a boost::fibers::unbuffered_channel
owned by the Consumer
, which is expected to pop the value from the channel once it is pushed. Pushing and popping are done in two different fibers. The problem is that for some reasons the Consumer
blocks on pop() forever. Do you know why and a possible fix without modifying too much the fundamental architecture based on a signal, a channel and two fibers? Here is a link to the full source code that prints some debugging messages to better understand the program flow. If you try to run the program, it gets automatically killed after some time of inactivity.
#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
using channel_t = boost::fibers::unbuffered_channel<int>;
namespace fibers = boost::fibers;
struct Producer {
boost::signals2::signal<void(int)> sig;
void notify() {
std::cout << "enter Producer::notify()\n";
for (int i = 0; i < 10; i++ ) {
sig(i);
std::this_thread::sleep_for(std::chrono::seconds{1});
}
std::cout << "complete Producer::notify()\n";
}
void start() {
std::cout << "enter Producer::start()\n";
fibers::fiber(fibers::launch::dispatch, &Producer::notify, this).detach();
std::cout << "complete Producer::start()\n";
}
};
struct Consumer {
channel_t chan;
Consumer() {
std::cout << "enter Consumer()\n";
fibers::fiber(fibers::launch::dispatch, &Consumer::start, this).detach();
std::cout << "complete Consumer()\n";
}
void start() {
std::cout << "enter Consumer::start()\n";
int i;
chan.pop(i);
std::cout << "complete Consumer::start()\n";
}
void operator()(int i) {
std::cout << "enter Consumer::operator()\n";
chan.push(i);
std::cout << "complete Consumer::operator()\n";
}
};
int main() {
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
Upvotes: 1
Views: 97
Reputation: 393557
First off, doing this_thread::sleep_for
is really an antipattern in "green threads": make that this_fiber::sleep_for
.
Next, the choice for dispatch
means that the current logical thread of execution is immediately suspended in favor of the launched fiber. This makes it so that you may not be starting both ends of the channel.
Now because all the fibers are detached, in your main, you are destructing the Consumer/Producer instances before their fibers completed.
Demonstrating:
std::endl;
for immediacy of outputstart
(launching functions) vs. run_fiber
#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
std::cout << e << " " << n << std::endl;
}
~tracer_t() { std::cout << l << " " << n << std::endl; }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<void(int)> sig;
void start() {
FUNTRACE;
fiber(how, &Producer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
sig(i);
sleep_for(1s);
}
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
std::cout << " -> channel_op_status: " << [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}() << std::endl;
return s == S::success;
}
#define CHECKED(op) \
[&] { \
std::cout << "Channel operation: " << #op << std::endl; \
auto tmp = (op); \
return checked(tmp); \
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t chan;
void start() {
FUNTRACE;
fiber(how, &Consumer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
int i = 0;
CHECKED(chan.pop(i));
}
void operator()(int i) {
FUNTRACE;
CHECKED(chan.push(i));
}
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
c.start();
std::cout << "done." << std::endl;
return 0;
} catch (std::exception const& e) {
std::cerr << "exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "unhandled exception" << std::endl;
}
std::cout << "exit." << std::endl;
return 1;
}
Prints on my machine:
Enter int main()
Construct Consumer
Construct Producer
Enter void Producer::start()
Enter void Producer::fiber_run()
Enter void Consumer::operator()(int)
Channel operation: chan.push(i)
Leave void Producer::start()
Enter void Consumer::start()
Enter void Consumer::fiber_run()
Channel operation: chan.pop(i)
-> channel_op_status: Success
Leave void Consumer::fiber_run()
Leave void Consumer::start()
done.
Destruct Producer
Destruct Consumer
Leave int main()
-> channel_op_status: Success
Leave void Consumer::operator()(int)
Segmentation fault (core dumped)
real 0m1,112s
user 0m0,999s
sys 0m0,003s
As you can see the clear problem is the lifetime of the Consumer/Producer objects. On Wandbox the output is slightly different (it may be due to a debug build of boost?)
... [skipped]
Destruct Producer
Destruct Consumer
Leave int main()
-> channel_op_status: Closed
Leave void Consumer::operator()(int)
prog.exe: /opt/wandbox/boost-1.79.0-gcc-12.1.0/include/boost/signals2/detail/lwm_pthreads.hpp:60: void boost::signals2::mutex::lock(): Assertion `pthread_mutex_lock(&m_) == 0' failed.
The time elapsed illustrates that things go awry as soon as the sleep_for(1s)
completes - after destruction. We can force the lifetime by inserting this_fiber::sleep_for(11s)
before destructing the objects.
Also added:
#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
static auto stamp() {
static const auto s = std::chrono::steady_clock::now();
return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log() { return std::cout << std::setw(7) << stamp() << " "; }
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
log() << e << " " << n << std::endl;
}
~tracer_t() { log() << l << " " << n << std::endl; }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<void(int)> sig;
void start() {
FUNTRACE;
fiber(how, &Producer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
sig(i);
sleep_for(1s);
}
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
log() << " -> channel_op_status: " << [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}() << std::endl;
return s == S::success;
}
#define CHECKED(op) \
[&] { \
log() << "Channel operation: " << #op << std::endl; \
auto tmp = (op); \
return checked(tmp); \
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t chan;
void start() {
FUNTRACE;
fiber(how, &Consumer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; CHECKED(chan.pop(i));) {
log() << " -> popped: " << i << std::endl;
if (i == 9)
break;
}
}
void operator()(int i) {
FUNTRACE;
CHECKED(chan.push(i));
}
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
c.start();
sleep_for(11s);
log() << "done." << std::endl;
return 0;
} catch (std::exception const& e) {
std::cerr << "exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "unhandled exception" << std::endl;
}
log() << "exit." << std::endl;
return 1;
}
Prints
0ms Enter int main()
0ms Construct Consumer
0ms Construct Producer
0ms Enter void Producer::start()
0ms Enter void Producer::fiber_run()
0ms Enter void Consumer::operator()(int)
0ms Channel operation: chan.push(i)
0ms Leave void Producer::start()
0ms Enter void Consumer::start()
0ms Enter void Consumer::fiber_run()
0ms Channel operation: chan.pop(i)
0ms -> channel_op_status: Success
0ms -> popped: 0
0ms Channel operation: chan.pop(i)
0ms Leave void Consumer::start()
0ms -> channel_op_status: Success
0ms Leave void Consumer::operator()(int)
1000ms Enter void Consumer::operator()(int)
1000ms Channel operation: chan.push(i)
1000ms -> channel_op_status: Success
1000ms -> popped: 1
1000ms Channel operation: chan.pop(i)
1000ms -> channel_op_status: Success
1000ms Leave void Consumer::operator()(int)
2000ms Enter void Consumer::operator()(int)
2000ms Channel operation: chan.push(i)
2000ms -> channel_op_status: Success
2000ms -> popped: 2
2000ms Channel operation: chan.pop(i)
2000ms -> channel_op_status: Success
2000ms Leave void Consumer::operator()(int)
3000ms Enter void Consumer::operator()(int)
3000ms Channel operation: chan.push(i)
3000ms -> channel_op_status: Success
3000ms -> popped: 3
3000ms Channel operation: chan.pop(i)
3000ms -> channel_op_status: Success
3000ms Leave void Consumer::operator()(int)
4000ms Enter void Consumer::operator()(int)
4000ms Channel operation: chan.push(i)
4000ms -> channel_op_status: Success
4000ms -> popped: 4
4000ms Channel operation: chan.pop(i)
4000ms -> channel_op_status: Success
4000ms Leave void Consumer::operator()(int)
5001ms Enter void Consumer::operator()(int)
5001ms Channel operation: chan.push(i)
5001ms -> channel_op_status: Success
5001ms -> popped: 5
5001ms Channel operation: chan.pop(i)
5001ms -> channel_op_status: Success
5001ms Leave void Consumer::operator()(int)
6001ms Enter void Consumer::operator()(int)
6001ms Channel operation: chan.push(i)
6001ms -> channel_op_status: Success
6001ms -> popped: 6
6001ms Channel operation: chan.pop(i)
6001ms -> channel_op_status: Success
6001ms Leave void Consumer::operator()(int)
7001ms Enter void Consumer::operator()(int)
7001ms Channel operation: chan.push(i)
7001ms -> channel_op_status: Success
7001ms -> popped: 7
7001ms Channel operation: chan.pop(i)
7001ms -> channel_op_status: Success
7001ms Leave void Consumer::operator()(int)
8001ms Enter void Consumer::operator()(int)
8001ms Channel operation: chan.push(i)
8001ms -> channel_op_status: Success
8001ms -> popped: 8
8001ms Channel operation: chan.pop(i)
8001ms -> channel_op_status: Success
8001ms Leave void Consumer::operator()(int)
9001ms Enter void Consumer::operator()(int)
9001ms Channel operation: chan.push(i)
9001ms -> channel_op_status: Success
9001ms -> popped: 9
9001ms Leave void Consumer::fiber_run()
9002ms -> channel_op_status: Success
9002ms Leave void Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
11000ms done.
11000ms Destruct Producer
11000ms Destruct Consumer
11000ms Leave int main()
That doesn't seem to scale. Right. You might want to
std::enable_shared_from_this
to automatically govern lifetimes even though you detach the fibersjoin
them before destruction of the related objects9
in this example) to signal the endCombining some of the 2. and 3. here. Note also the commented bit to test for forced early shutdown:
//// E.g. for forced shutdown:
// { sleep_for(4s); c.close(); }
#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
static auto stamp() {
static const auto s = std::chrono::steady_clock::now();
return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log(auto const&... args) {
return ((std::cout << std::setw(7) << stamp() << " ") << ... << args)
<< std::endl;
}
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) { log(e, " ", n); }
~tracer_t() { log(l, " ", n); }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
static constexpr int SENTINEL = -99;
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<bool(int)> _sig;
fiber _fib;
~Producer() {
FUNTRACE;
if (_fib.joinable())
_fib.join();
}
void start() {
FUNTRACE;
_fib = {how, &Producer::fiber_run, this};
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
if (!_sig(i).value_or(false)) {
log("Aborting producer");
return;
}
sleep_for(1s);
}
log("Closing producer");
_sig(SENTINEL);
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
log(" -> channel_op_status: ", [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}());
return s == S::success;
}
#define CHECKED(op) \
[&] { \
log("Channel operation: ", #op); \
auto tmp = (op); \
return checked(tmp); \
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t _chan;
fiber _fib;
~Consumer() {
FUNTRACE;
close();
if (_fib.joinable())
_fib.join();
}
void start() { FUNTRACE;
_fib = {how, &Consumer::fiber_run, this};
}
void close() {
if (!_chan.is_closed())
_chan.close();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; CHECKED(_chan.pop(i));) {
log(" -> popped: ", i);
if (i == SENTINEL)
break;
}
close();
}
bool operator()(int i) {
FUNTRACE;
return CHECKED(_chan.push(i));
}
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p._sig.connect(std::ref(c));
p.start();
c.start();
log("done.");
//// E.g. for forced shutdown:
// { sleep_for(4s); c.close(); }
return 0;
} catch (std::exception const& e) {
log("exception: ", e.what());
} catch (...) {
log("unhandled exception");
}
log("exit.");
return 1;
}
Printing the expected
0ms Enter int main()
0ms Construct Consumer
0ms Construct Producer
0ms Enter void Producer::start()
0ms Enter void Producer::fiber_run()
0ms Enter bool Consumer::operator()(int)
0ms Channel operation: _chan.push(i)
0ms Leave void Producer::start()
0ms Enter void Consumer::start()
0ms Enter void Consumer::fiber_run()
0ms Channel operation: _chan.pop(i)
0ms -> channel_op_status: Success
0ms -> popped: 0
// ...
10002ms Closing producer
10002ms Enter bool Consumer::operator()(int)
10002ms Channel operation: _chan.push(i)
10002ms -> channel_op_status: Success
10002ms -> popped: -99
10002ms Leave void Consumer::fiber_run()
10002ms -> channel_op_status: Closed
10002ms Leave bool Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
10002ms Leave Producer::~Producer()
10002ms Destruct Producer
10002ms Enter Consumer::~Consumer()
10002ms Leave Consumer::~Consumer()
10002ms Destruct Consumer
10002ms Leave int main()
Upvotes: 1