Reputation: 11
I want to use a C API in a pipeline of coroutines communicating through channels. This is my first try with coroutines and my knowledge of them is limited.
The shape of the pipeline is:
-------- 1 --------- 2 ------
| source | ----> | process | ----> | sink |
-------- --------- ------
Each box represents a coroutine and each arrow a channel.
The C API is used in the process
coroutine.
Its signature is roughly: bool start_work(consumer_callback)
. This API is
synchronous and calls consumer_callback
once for each data it produces.
I first considered writing to the channel 2
(see diagram above) in the
callback, but this would change the signature of the callback so it's not
possible.
I changed to pass a coroutine handle to the callback, which resumes it. The
resumed coroutine then writes the data to the channel 2
.
The simplified code is:
#include <coroutine>
#include <optional>
#include <string>
#include <boost/cobalt/channel.hpp>
#include <boost/cobalt/main.hpp>
#include <boost/cobalt/promise.hpp>
#include <boost/cobalt/join.hpp>
namespace cobalt = boost::cobalt;
// Data to communicate between the callback and the channel writer.
struct Data {
std::optional<int> result;
bool done = false;
std::coroutine_handle<> coro_handle;
};
using Callback = void (*)(int, void*, bool);
void consumer_callback(int i, void* data, bool done) {
Data& data_ = *reinterpret_cast<Data*>(data);
data_.done = done;
if (!done) {
data_.result = i;
}
data_.coro_handle.resume();
}
// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
bool done = false;
for (int i = 0; i < 10; ++i) {
cb(i, data, done); // !done case
}
done = true;
cb(0, data, done); // done case
}
struct Awaiter : std::suspend_always {
Data& data;
bool first;
bool await_ready() {
return data.result.has_value();
}
void await_suspend(std::coroutine_handle<> h) {
data.coro_handle = h;
if (first) start_work(&data, consumer_callback);
}
int await_resume() {
assert(data.result.has_value());
auto opt = std::exchange(data.result, std::nullopt);
return opt.value();
}
};
Awaiter nextResult(Data& data, bool first) {
return {{}, data, first};
}
cobalt::promise<void> source(cobalt::channel<std::string>& out) {
co_await out.write("Hello world!");
out.close();
}
cobalt::promise<void> process(cobalt::channel<std::string>& in, cobalt::channel<int>& out) {
Data data;
while (in.is_open() && out.is_open()) {
auto _ = co_await in.read(); // ignore result for now
auto first = true;
while (!data.done || data.result.has_value()) {
auto i = co_await nextResult(data, first);
co_await out.write(i);
first = false;
}
}
in.close();
out.close();
}
cobalt::promise<void> sink(cobalt::channel<int>& in) {
while (in.is_open()) {
auto i = co_await in.read(); // ignore result for now
}
in.close();
}
cobalt::main co_main(int argc, char* argv[]) {
cobalt::channel<std::string> a;
cobalt::channel<int> b;
co_await cobalt::join(
source(a),
process(a, b),
sink(b)
);
co_return 0;
}
The sink correctly receives all data, but when the process
coroutine is done, there
is inside Asio a coroutine resume to the null pointer. What am I doing wrong?
Thanks!
Environment:
Ubuntu 20.04
Boost 1.85
g++13 -std=gnu++2a
Upvotes: 1
Views: 129