AntVival
AntVival

Reputation: 11

Why does not this simple Producer-Consumer program using two boost::fibers work as expected?

Consider the following simplified program with a Producer and a Consumer. The latter is notified whenever an int is available from the former (in this simple case, every second). The notification is done by pushing the value into a boost::fibers::unbuffered_channel owned by the Consumer, which is expected to pop the value from the channel once it is pushed. Pushing and popping are done in two different fibers. The problem is that for some reasons the Consumer blocks on pop() forever. Do you know why and a possible fix without modifying too much the fundamental architecture based on a signal, a channel and two fibers? Here is a link to the full source code that prints some debugging messages to better understand the program flow. If you try to run the program, it gets automatically killed after some time of inactivity.

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>

using channel_t = boost::fibers::unbuffered_channel<int>;
namespace fibers = boost::fibers;

struct Producer {
    boost::signals2::signal<void(int)> sig;
    void notify() {
        std::cout << "enter Producer::notify()\n";
        for (int i = 0; i < 10; i++ ) {
            sig(i);
            std::this_thread::sleep_for(std::chrono::seconds{1});
        }
        std::cout << "complete Producer::notify()\n";
    }
    void start() {
        std::cout << "enter Producer::start()\n";
        fibers::fiber(fibers::launch::dispatch, &Producer::notify, this).detach();
        std::cout << "complete Producer::start()\n";
    }        
};

struct Consumer {
    channel_t chan;
    Consumer() {
        std::cout << "enter Consumer()\n";
        fibers::fiber(fibers::launch::dispatch, &Consumer::start, this).detach();
        std::cout << "complete Consumer()\n";
    }
    void start() {
        std::cout << "enter Consumer::start()\n";
        int i;
        chan.pop(i);
        std::cout << "complete Consumer::start()\n";
    }
    void operator()(int i) {
       std::cout << "enter Consumer::operator()\n";
       chan.push(i); 
       std::cout << "complete Consumer::operator()\n";
    }    
};

int main() {
    Consumer c;
    Producer p;
    p.sig.connect(std::ref(c));
    p.start();
    std::cout << "done." << std::endl;
    return EXIT_SUCCESS;
}

Upvotes: 1

Views: 97

Answers (1)

sehe
sehe

Reputation: 393557

First off, doing this_thread::sleep_for is really an antipattern in "green threads": make that this_fiber::sleep_for.

Next, the choice for dispatch means that the current logical thread of execution is immediately suspended in favor of the launched fiber. This makes it so that you may not be starting both ends of the channel.

Now because all the fibers are detached, in your main, you are destructing the Consumer/Producer instances before their fibers completed.

Demonstrating:

  • the changes above
  • more consistent tracing
  • using flushing std::endl; for immediacy of output
  • not launching fibers from constructors
  • consistently naming start (launching functions) vs. run_fiber
  • making the lifetime of Producer/Consumer visible
  • checking and reporting the result of channel operations

Live On Wandbox

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>

using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;

struct tracer_t {
    using S = char const*;
    S n, e, l;
    tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
        std::cout << e << " " << n << std::endl;
    }
    ~tracer_t() { std::cout << l << " " << n << std::endl; }
};

struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };

#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)

struct Producer {
    obj_tracer_t _lifetime{"Producer"};
    boost::signals2::signal<void(int)> sig;

    void start() {
        FUNTRACE;
        fiber(how, &Producer::fiber_run, this).detach();
    }

    void fiber_run() {
        FUNTRACE;
        for (int i = 0; i < 10; i++) {
            sig(i);
            sleep_for(1s);
        }
    }
};

bool checked(boost::fibers::channel_op_status s) {
    using S = boost::fibers::channel_op_status;
    std::cout << " -> channel_op_status: " << [s] {
        switch (s) {
        case S::closed: return "Closed";
        case S::empty: return "Empty"; break;
        case S::timeout: return "Timeout"; break;
        case S::full: return "Full"; break;
        case S::success: return "Success";
        default: return "?";
        };
    }() << std::endl;
    return s == S::success;
}

#define CHECKED(op)                                                            \
    [&] {                                                                      \
        std::cout << "Channel operation: " << #op << std::endl;                \
        auto tmp = (op);                                                       \
        return checked(tmp);                                                   \
    }()

struct Consumer {
    obj_tracer_t _lifetime{"Consumer"};

    channel_t chan;
    void start() {
        FUNTRACE;
        fiber(how, &Consumer::fiber_run, this).detach();
    }
    void fiber_run() {
        FUNTRACE;
        int i = 0;
        CHECKED(chan.pop(i));
    }
    void operator()(int i) {
        FUNTRACE;
        CHECKED(chan.push(i));
    }    
};

int main() {
    try {
        FUNTRACE;
        Consumer c;
        Producer p;

        p.sig.connect(std::ref(c));

        p.start();
        c.start();

        std::cout << "done." << std::endl;
        return 0;
    } catch (std::exception const& e) {
        std::cerr << "exception: " << e.what() << std::endl;
    } catch (...) {
        std::cerr << "unhandled exception" << std::endl;
    }
    std::cout << "exit." << std::endl;
    return 1;
}

Prints on my machine:

Enter int main()
Construct Consumer
Construct Producer
Enter void Producer::start()
Enter void Producer::fiber_run()
Enter void Consumer::operator()(int)
Channel operation: chan.push(i)
Leave void Producer::start()
Enter void Consumer::start()
Enter void Consumer::fiber_run()
Channel operation: chan.pop(i)
 -> channel_op_status: Success
Leave void Consumer::fiber_run()
Leave void Consumer::start()
done.
Destruct Producer
Destruct Consumer
Leave int main()
 -> channel_op_status: Success
Leave void Consumer::operator()(int)
Segmentation fault (core dumped)

real    0m1,112s
user    0m0,999s
sys     0m0,003s

As you can see the clear problem is the lifetime of the Consumer/Producer objects. On Wandbox the output is slightly different (it may be due to a debug build of boost?)

... [skipped]
Destruct Producer
Destruct Consumer
Leave int main()
 -> channel_op_status: Closed
Leave void Consumer::operator()(int)
prog.exe: /opt/wandbox/boost-1.79.0-gcc-12.1.0/include/boost/signals2/detail/lwm_pthreads.hpp:60: void boost::signals2::mutex::lock(): Assertion `pthread_mutex_lock(&m_) == 0' failed.

The time elapsed illustrates that things go awry as soon as the sleep_for(1s) completes - after destruction. We can force the lifetime by inserting this_fiber::sleep_for(11s) before destructing the objects.

Also added:

  • timestamps for log lines
  • looping the consumer fiber so it will not block the producer

Live On Wandbox

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;

static auto stamp() {
    static const auto s = std::chrono::steady_clock::now();
    return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log() { return std::cout << std::setw(7) << stamp() << " "; }

struct tracer_t {
    using S = char const*;
    S n, e, l;
    tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
        log() << e << " " << n << std::endl;
    }
    ~tracer_t() { log() << l << " " << n << std::endl; }
};

struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };

#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)

struct Producer {
    obj_tracer_t _lifetime{"Producer"};
    boost::signals2::signal<void(int)> sig;

    void start() {
        FUNTRACE;
        fiber(how, &Producer::fiber_run, this).detach();
    }

    void fiber_run() {
        FUNTRACE;
        for (int i = 0; i < 10; i++) {
            sig(i);
            sleep_for(1s);
        }
    }
};

bool checked(boost::fibers::channel_op_status s) {
    using S = boost::fibers::channel_op_status;
    log() << " -> channel_op_status: " << [s] {
        switch (s) {
        case S::closed: return "Closed";
        case S::empty: return "Empty"; break;
        case S::timeout: return "Timeout"; break;
        case S::full: return "Full"; break;
        case S::success: return "Success";
        default: return "?";
        };
    }() << std::endl;
    return s == S::success;
}

#define CHECKED(op)                                                            \
    [&] {                                                                      \
        log() << "Channel operation: " << #op << std::endl;                \
        auto tmp = (op);                                                       \
        return checked(tmp);                                                   \
    }()

struct Consumer {
    obj_tracer_t _lifetime{"Consumer"};

    channel_t chan;
    void start() {
        FUNTRACE;
        fiber(how, &Consumer::fiber_run, this).detach();
    }
    void fiber_run() {
        FUNTRACE;
        for (int i = 0; CHECKED(chan.pop(i));) {
            log() << " -> popped: " << i << std::endl;
            if (i == 9)
                break;
        }
    }
    void operator()(int i) {
        FUNTRACE;
        CHECKED(chan.push(i));
    }    
};

int main() {
    try {
        FUNTRACE;
        Consumer c;
        Producer p;

        p.sig.connect(std::ref(c));

        p.start();
        c.start();

        sleep_for(11s);
        log() << "done." << std::endl;
        return 0;
    } catch (std::exception const& e) {
        std::cerr << "exception: " << e.what() << std::endl;
    } catch (...) {
        std::cerr << "unhandled exception" << std::endl;
    }
    log() << "exit." << std::endl;
    return 1;
}

Prints

    0ms Enter int main()
    0ms Construct Consumer
    0ms Construct Producer
    0ms Enter void Producer::start()
    0ms Enter void Producer::fiber_run()
    0ms Enter void Consumer::operator()(int)
    0ms Channel operation: chan.push(i)
    0ms Leave void Producer::start()
    0ms Enter void Consumer::start()
    0ms Enter void Consumer::fiber_run()
    0ms Channel operation: chan.pop(i)
    0ms  -> channel_op_status: Success
    0ms  -> popped: 0
    0ms Channel operation: chan.pop(i)
    0ms Leave void Consumer::start()
    0ms  -> channel_op_status: Success
    0ms Leave void Consumer::operator()(int)
 1000ms Enter void Consumer::operator()(int)
 1000ms Channel operation: chan.push(i)
 1000ms  -> channel_op_status: Success
 1000ms  -> popped: 1
 1000ms Channel operation: chan.pop(i)
 1000ms  -> channel_op_status: Success
 1000ms Leave void Consumer::operator()(int)
 2000ms Enter void Consumer::operator()(int)
 2000ms Channel operation: chan.push(i)
 2000ms  -> channel_op_status: Success
 2000ms  -> popped: 2
 2000ms Channel operation: chan.pop(i)
 2000ms  -> channel_op_status: Success
 2000ms Leave void Consumer::operator()(int)
 3000ms Enter void Consumer::operator()(int)
 3000ms Channel operation: chan.push(i)
 3000ms  -> channel_op_status: Success
 3000ms  -> popped: 3
 3000ms Channel operation: chan.pop(i)
 3000ms  -> channel_op_status: Success
 3000ms Leave void Consumer::operator()(int)
 4000ms Enter void Consumer::operator()(int)
 4000ms Channel operation: chan.push(i)
 4000ms  -> channel_op_status: Success
 4000ms  -> popped: 4
 4000ms Channel operation: chan.pop(i)
 4000ms  -> channel_op_status: Success
 4000ms Leave void Consumer::operator()(int)
 5001ms Enter void Consumer::operator()(int)
 5001ms Channel operation: chan.push(i)
 5001ms  -> channel_op_status: Success
 5001ms  -> popped: 5
 5001ms Channel operation: chan.pop(i)
 5001ms  -> channel_op_status: Success
 5001ms Leave void Consumer::operator()(int)
 6001ms Enter void Consumer::operator()(int)
 6001ms Channel operation: chan.push(i)
 6001ms  -> channel_op_status: Success
 6001ms  -> popped: 6
 6001ms Channel operation: chan.pop(i)
 6001ms  -> channel_op_status: Success
 6001ms Leave void Consumer::operator()(int)
 7001ms Enter void Consumer::operator()(int)
 7001ms Channel operation: chan.push(i)
 7001ms  -> channel_op_status: Success
 7001ms  -> popped: 7
 7001ms Channel operation: chan.pop(i)
 7001ms  -> channel_op_status: Success
 7001ms Leave void Consumer::operator()(int)
 8001ms Enter void Consumer::operator()(int)
 8001ms Channel operation: chan.push(i)
 8001ms  -> channel_op_status: Success
 8001ms  -> popped: 8
 8001ms Channel operation: chan.pop(i)
 8001ms  -> channel_op_status: Success
 8001ms Leave void Consumer::operator()(int)
 9001ms Enter void Consumer::operator()(int)
 9001ms Channel operation: chan.push(i)
 9001ms  -> channel_op_status: Success
 9001ms  -> popped: 9
 9001ms Leave void Consumer::fiber_run()
 9002ms  -> channel_op_status: Success
 9002ms Leave void Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
11000ms done.
11000ms Destruct Producer
11000ms Destruct Consumer
11000ms Leave int main()

But...

That doesn't seem to scale. Right. You might want to

  1. derive Consumer/Producer from std::enable_shared_from_this to automatically govern lifetimes even though you detach the fibers
  2. don't detach the fibers, so you can join them before destruction of the related objects
  3. cooperate for graceful shutdown. E.g. you could detect a closed channel as the sign to stop fibers, or you could push a special sentinel value (e.g. 9 in this example) to signal the end

Combining some of the 2. and 3. here. Note also the commented bit to test for forced early shutdown:

//// E.g. for forced shutdown:
// { sleep_for(4s); c.close(); }

Live On Wandbox

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;

static auto stamp() {
    static const auto s = std::chrono::steady_clock::now();
    return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log(auto const&... args) {
    return ((std::cout << std::setw(7) << stamp() << " ") << ... << args)
        << std::endl;
}

struct tracer_t {
    using S = char const*;
    S n, e, l;
    tracer_t(S n, S e, S l) : n(n), e(e), l(l) { log(e, " ", n); }
    ~tracer_t() { log(l, " ", n); }
};

struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };

#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)

static constexpr int SENTINEL = -99;

struct Producer {
    obj_tracer_t _lifetime{"Producer"};
    boost::signals2::signal<bool(int)> _sig;
    fiber _fib;

    ~Producer() {
        FUNTRACE;
        if (_fib.joinable())
            _fib.join();
    }

    void start() {
        FUNTRACE;
        _fib = {how, &Producer::fiber_run, this};
    }

    void fiber_run() {
        FUNTRACE;
        for (int i = 0; i < 10; i++) {
            if (!_sig(i).value_or(false)) {
                log("Aborting producer");
                return;
            }
            sleep_for(1s);
        }
        log("Closing producer");
        _sig(SENTINEL);
    }
};

bool checked(boost::fibers::channel_op_status s) {
    using S = boost::fibers::channel_op_status;
    log(" -> channel_op_status: ", [s] {
        switch (s) {
        case S::closed: return "Closed";
        case S::empty: return "Empty"; break;
        case S::timeout: return "Timeout"; break;
        case S::full: return "Full"; break;
        case S::success: return "Success";
        default: return "?";
        };
    }());
    return s == S::success;
}

#define CHECKED(op)                                                            \
    [&] {                                                                      \
        log("Channel operation: ", #op);                                       \
        auto tmp = (op);                                                       \
        return checked(tmp);                                                   \
    }()

struct Consumer {
    obj_tracer_t _lifetime{"Consumer"};

    channel_t _chan;
    fiber     _fib;

    ~Consumer() {
        FUNTRACE;
        close();
        if (_fib.joinable())
            _fib.join();
    }

    void start() { FUNTRACE;
        _fib = {how, &Consumer::fiber_run, this};
    }

    void close() {
        if (!_chan.is_closed())
            _chan.close();
    }

    void fiber_run() {
        FUNTRACE;
        for (int i = 0; CHECKED(_chan.pop(i));) {
            log(" -> popped: ", i);
            if (i == SENTINEL)
                break;
        }

        close();
    }
    bool operator()(int i) {
        FUNTRACE;
        return CHECKED(_chan.push(i));
    }    
};

int main() {
    try {
        FUNTRACE;
        Consumer c;
        Producer p;

        p._sig.connect(std::ref(c));

        p.start();
        c.start();

        log("done.");

        //// E.g. for forced shutdown:
        // { sleep_for(4s); c.close(); }
        return 0;
    } catch (std::exception const& e) {
        log("exception: ", e.what());
    } catch (...) {
        log("unhandled exception");
    }
    log("exit.");
    return 1;
}

Printing the expected

    0ms Enter int main()
    0ms Construct Consumer
    0ms Construct Producer
    0ms Enter void Producer::start()
    0ms Enter void Producer::fiber_run()
    0ms Enter bool Consumer::operator()(int)
    0ms Channel operation: _chan.push(i)
    0ms Leave void Producer::start()
    0ms Enter void Consumer::start()
    0ms Enter void Consumer::fiber_run()
    0ms Channel operation: _chan.pop(i)
    0ms  -> channel_op_status: Success
    0ms  -> popped: 0
// ...
10002ms Closing producer
10002ms Enter bool Consumer::operator()(int)
10002ms Channel operation: _chan.push(i)
10002ms  -> channel_op_status: Success
10002ms  -> popped: -99
10002ms Leave void Consumer::fiber_run()
10002ms  -> channel_op_status: Closed
10002ms Leave bool Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
10002ms Leave Producer::~Producer()
10002ms Destruct Producer
10002ms Enter Consumer::~Consumer()
10002ms Leave Consumer::~Consumer()
10002ms Destruct Consumer
10002ms Leave int main()

Upvotes: 1

Related Questions