dvnh87
dvnh87

Reputation: 11

Boost.Cobalt: how to use channels from a C API callback?

I want to use a C API in a pipeline of coroutines communicating through channels. This is my first try with coroutines and my knowledge of them is limited.

The shape of the pipeline is:

 --------    1    ---------    2    ------
| source | ----> | process | ----> | sink |
 --------         ---------         ------

Each box represents a coroutine and each arrow a channel.

The C API is used in the process coroutine.

Its signature is roughly: bool start_work(consumer_callback). This API is synchronous and calls consumer_callback once for each data it produces.

I first considered writing to the channel 2 (see diagram above) in the callback, but this would change the signature of the callback so it's not possible.

I changed to pass a coroutine handle to the callback, which resumes it. The resumed coroutine then writes the data to the channel 2.

The simplified code is:

#include <coroutine>
#include <optional>
#include <string>
#include <boost/cobalt/channel.hpp>
#include <boost/cobalt/main.hpp>
#include <boost/cobalt/promise.hpp>
#include <boost/cobalt/join.hpp>

namespace cobalt = boost::cobalt;

// Data to communicate between the callback and the channel writer.
struct Data {
   std::optional<int> result;
   bool done = false;
   std::coroutine_handle<> coro_handle;
};

using Callback = void (*)(int, void*, bool);

void consumer_callback(int i, void* data, bool done) {
   Data& data_ = *reinterpret_cast<Data*>(data);
   data_.done = done;
   if (!done) {
      data_.result = i;
   }
   data_.coro_handle.resume();
}

// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
    bool done = false;
    for (int i = 0; i < 10; ++i) {
       cb(i, data, done); // !done case
    }
    done = true;
    cb(0, data, done); // done case
}

struct Awaiter : std::suspend_always {
    Data& data;
    bool first;

    bool await_ready() {
        return data.result.has_value();
    }

    void await_suspend(std::coroutine_handle<> h) {
        data.coro_handle = h;
        if (first) start_work(&data, consumer_callback);
    }

    int await_resume() {
        assert(data.result.has_value());
        auto opt = std::exchange(data.result, std::nullopt);
        return opt.value();
    }
};

Awaiter nextResult(Data& data, bool first) {
    return {{}, data, first};
}

cobalt::promise<void> source(cobalt::channel<std::string>& out) {
    co_await out.write("Hello world!");
    out.close();
}

cobalt::promise<void> process(cobalt::channel<std::string>& in, cobalt::channel<int>& out) {
    Data data;
    while (in.is_open() && out.is_open()) {
        auto _ = co_await in.read(); // ignore result for now
        auto first = true;
        while (!data.done || data.result.has_value()) {
            auto i = co_await nextResult(data, first);
            co_await out.write(i);
            first = false;
        }
    }
    in.close();
    out.close();
}

cobalt::promise<void> sink(cobalt::channel<int>& in) {
    while (in.is_open()) {
        auto i = co_await in.read(); // ignore result for now
    }
    in.close();
}

cobalt::main co_main(int argc, char* argv[]) {
    cobalt::channel<std::string> a;
    cobalt::channel<int> b;
    co_await cobalt::join(
        source(a),
        process(a, b),
        sink(b)
    );
    co_return 0;
}

The sink correctly receives all data, but when the process coroutine is done, there is inside Asio a coroutine resume to the null pointer. What am I doing wrong? Thanks!

Environment:

Ubuntu 20.04

Boost 1.85

g++13 -std=gnu++2a

Upvotes: 1

Views: 129

Answers (0)

Related Questions