Reputation: 39
I ran into a roadblock. The code below has issues, but this is just a demo; I want to get the high level logic correct first.
The two startup application output a lot of startup info, before arriving the the "ready" state. At this state, Program A is ready for user input via stdin. Program B just listens via network connection--ingest and record data.
Ideally, with this sample program, I should be able to see the output from Program B, in "real-time". But at each loop iteration, nothing happens; I'm not sure it's receiving input via its pipe.
I was previously using bp::opstream
to write to the child's--Program A--stdin. I know if some command are accepted to by Program A via its async_pipe, Progam B show also show some logging info (e.g. "trip"). These are window console applications, and I'm using Boost C++ to interact with them as child processes.
Does anyone have any ideas what's going on?
std::size_t read_loop(bp::async_pipe& p, mutable_buffer buf, boost::system::error_code &err)
{
return p.read_some(buf, err);
}
void read_loop_async(bp::async_pipe& p, mutable_buffer buf, std::error_code &err) {
p.async_read_some(buf, [&p, buf, &err](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
err = ec;
if (!ec)
read_loop_async(p, buf, err);
});
}
void write_pipe(bp::async_pipe&p, mutable_buffer buf)
{
ba::async_write(p, buf, [](boost::system::error_code ec, std::size_t sz)
{
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
});
}
int main()
{
bp::opstream sendToChild;
string wd = "<---path-to-working-dir----->";
ba::io_service ios;
string bin = "<path-to-bin-and-name>";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "arg1", "arg2", "arg3", bp::std_out > output_pipe,
bp::std_in < input_pipe, ios, bp::start_dir(wd.c_str()));
size_t size = 8192;
string input;
vector <char> buffer(size);
boost::system::error_code ec;
std::error_code err;
ios.run();
while (1)
{
//show read whatever is available from the childs output_pipe
read_loop_async(output_pipe, bp::buffer(buffer), err);
cout << "\nBoot-> ";
cin >> input;
if (input == "1")
{
cout << " send input to child: ";
cin >> input;
//send commands to the child, Program A
//originally
//sendToChild << input<< endl;
write_pipe(input_pipe, bp::buffer(input));
}
if (input == "quit")
{
//sendToChild << input << endl;
read_loop_async(output_pipe, bp::buffer(buffer), err);
break;
}
ios.poll(ec);
ios.restart();
}
c.join();
cout << "done...";
cin >> input;
}
Here is the link I followed: How to retrieve program output as soon as it printed?
Upvotes: 1
Views: 905
Reputation: 393507
Hmm. There's a lot to unpack. First off:
ios.run();
runs until the child completes. It might well deadlock if the child process needs to send more output than fits in the buffers, sinc ye you don't consume any of it before doing ios.run()
.
The next poll()
by definition does not do anything, because you didn't call restart
first. Luckily you ignore the error codes and restart
happens next.
Then, you get the next problem, because the next iteration of the loop starts with another read_loop_async(output_pipe, bp::buffer(buffer), err);
which means you have overlapping read operations which is usually forbidden (Undefined Behaviour), but runs into UB anyways here because you're using the same buffers.
This in itself is more than enough to explain "lost data" since, yeah, you're doing multiple reads in the same location, so one would clobber the other. That is, if you could reason about it, because you cannot reason about UB.
Wierdly now my eye spots even a third invocation of read_loop_async
. It makes no sense. As the name suggests, read_loop_async
is already a loop: it calls itself when completed:
if (!ec)
read_loop_async(p, buf, err);
So, only 1 invocation would ever be expected. It seems like you don't grasp that async_*
initiation functions always return immediately (because the operation completes asynchronously). This is also exemplified in the fact that you assign:
err = ec;
Where err
is a reference argument to the initiation function. It doesn't work like that. The error is only available on completion. Since you don't seem to use it outside the read loop anyways, I'll drop it.
Then there's
sendToChild << input << std::endl;
Which does absolutely nothing, since sendToChild
is literally only declared, and never used elsewhere.
write_pipe
again tries to use an async_
initiation, but it cannot, because it's being used in a synchronous input loop. Just don't use async there. As written it was another source of UB, because the buf
argument would point to a std::string
variable that was being mutated in the main function. So, simplify:
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
[Note how it correctly marks buf
as const_buffer
.]
Now, probably fix that sendToChild
use by
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
I'll replace the ios.restart()
stuff with just poll()
- because we didn't run()
it too early anyways.
Other than the above, I replaced the operator>>
with std::getline
calls because it's most likely you want the user input to be delimited with Enter keys, not space. I also added "\n"
as you had in the sendToChild
line, because it helps demonstrating with a simple test child that uses line-buffered input.
Now, we'll use this as a test child:
bp::child c(bin, "-c",
"time while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
Which means we get our input echoed in reverse and hex dump, and a time summary at the end.
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::asio::const_buffer;
using boost::asio::mutable_buffer;
using boost::system::error_code;
void read_loop_async(bp::async_pipe& p, mutable_buffer buf) {
p.async_read_some(buf, [&p, buf](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
if (!ec)
read_loop_async(p, buf);
});
}
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
int main() {
std::string wd = "/home/sehe/Projects/stackoverflow";
boost::asio::io_service ios;
std::string bin = "/bin/bash";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "-c",
"while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
// Single invocation!
std::vector<char> buffer(8192);
read_loop_async(output_pipe, bp::buffer(buffer));
std::cout << "\nBoot-> ";
for (std::string input; getline(std::cin, input);
std::cout << "\nBoot-> ") {
if (input == "1") {
std::cout << " send input to child: ";
if (getline(std::cin, input)) {
write_pipe(input_pipe, bp::buffer(input + "\n"));
}
}
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
ios.poll();
}
ios.run(); // effectively like `c.wait();` but async
std::cout << "done...";
// ignore until line end
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
Tested with
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -lboost_{system,filesystem} && ./a.out <<HERE
1
Hello world
Ignored
1
Bye world
quit
HERE
Prints
Boot-> send input to child: Size Written 12 Ec: system:0 Success
Boot->
Boot-> send input to child: Size Written 10 Ec: system:0 Success
Boot-> Size Written 5 Ec: system:0 Success
Received 64 bytes (Success): '00000000: 646c 726f 7720 6f6c 6c65 480a dlrow olleH.
Received 62 bytes (Success): '00000000: 646c 726f 7720 6579 420a dlrow eyB.
Received 57 bytes (Success): '00000000: 7469 7571 0a tiuq.
Received 0 bytes (End of file): '
done...
Which is easier to follow interactively on my system:
Upvotes: 1