Reputation: 1335
I am programming a personal project which one part being the ability to execute given programs and handle their input/output. I've read a lot of similar questions and learnt a lot from them but I am stuck now into a problem I don't find the cause and I think it is a race condition when piping the stdout to next program stdin if there are more than 2 in the "chain".
When chaining executables (with exec >> exec2 >> exec3
...) the std::thread
to the function Executable::consume_and_forward
is created which reads all stdout (until EOF) and writes to bound program's stdin.
I made a main file with some test and the last one fails with SIGPIPE even when I'm polling before write (which should not happen?), can someone please tell me what's wrong? (BTW the code will be merged to master and be opensource, currently it is in the 2.0.0 working branch).
EDIT: As stated in the comments, sed is correctly executed by execvp but dies soon, then when trying to write SIGPIPE is sent. The strange thing is that the pipe is reported POLLOUT despite the child has been killed.
EDIT2: Now that sed is fixed I got a block in consume_and_forward
when trying to read from the first command. This is strange because only commenting sed parts from test4
makes it work. Why is then the read
from pipe blocking?
Current output:
Test 1: OK
Test 2: OK
Test 3: OK
Test 4: (SIGPIPE and aborts)
EDIT: By removing comment on main to just ignore SIGPIPE
and get the test failing with no abort but this is not an option since it should work and the code flaw is elsewhere.
Since I use polling to try to avoid to write from a closed pipe, why is this failing?
main.cxx
#include "executable.hxx"
#include "pipe.hxx"
#include <iostream>
#include <sys/wait.h>
#include <signal.h>
void test_result(const std::string& expected, const std::string& real) {
if (expected == real)
std::cout << "OK";
else
std::cout << "ERROR (expected: " << expected << ", got: " << real << ")";
std::cout << std::endl;
}
void test_result(const std::string& expected, const std::optional<std::string>& real) {
test_result(expected, real.value_or("<EMPTY>"));
}
void test1() {
std::cout << "Test 1: " << std::flush;
Executable exec1("/usr/bin/sort", {"-"});
const std::string expected = "1\n2\n3\n";
std::optional<std::string> returned;
exec1 << "3\n" << "2\n" << "1\n" << Executable::EoF;
exec1.wait();
exec1 >> returned;
test_result(expected, returned);
}
void test2() {
std::cout << "Test 2: " << std::flush;
Executable exec2("/usr/bin/tr", {"-d", "\n"});
const std::string expected = "321";
std::optional<std::string> returned;
exec2 << "3\n" << "2\n" << "1\n" << Executable::EoF;
exec2.wait();
exec2 >> returned;
test_result(expected, returned);
}
void test3() {
std::cout << "Test 3: " << std::flush;
Executable tr("/usr/bin/tr", {"-d", "\n"});
Executable sort("/usr/bin/sort", {"-"});
const std::string expected = "123";
std::optional<std::string> returned;
sort >> tr;
sort << "3\n" << "2\n" << "1\n" << Executable::EoF;
sort.wait();
tr.wait();
tr >> returned;
test_result(expected, returned);
}
void test4() {
/*** Here commenting sed parts and getting tr output makes it work ***/
std::cout << "Test 4: " << std::flush;
Executable tr("/usr/bin/tr", {"-d", "\n"});
Executable sort("/usr/bin/sort", {"-"});
Executable sed("/bin/sed", {"-e", "'s/3/9/'"});
const std::string expected = "129";
std::optional<std::string> returned;
sort >> tr;
tr >> sed;
sort << "3\n" << "2\n" << "1\n" << Executable::EoF;
sort.wait();
tr.wait();
sed.wait();
sed >> returned;
test_result(expected, returned);
}
int main() {
//signal(SIGPIPE, SIG_IGN);
test1();
test2();
test3();
test4();
return 0;
}
pipe.hxx
#pragma once
#include <optional>
#include <stddef.h>
#include <string>
#include <sys/poll.h>
class Pipe {
public:
Pipe();
Pipe(const Pipe&) = delete;
Pipe(Pipe&&) = default;
Pipe& operator=(const Pipe&) = delete;
Pipe& operator=(Pipe&&) = default;
~Pipe();
void bind_read(int);
void bind_read(Pipe&);
void bind_write(int);
void bind_write(Pipe&);
void close_read();
void close_write();
int poll(int) const;
bool has_read_event(unsigned short) const;
bool has_write_event(unsigned short) const;
Pipe& operator<<(const std::string&);
std::optional<std::string>& operator>>(std::optional<std::string>&) const;
private:
void write(const std::string&);
std::optional<std::string> read() const;
void bind(int&, int);
void close(int&);
void init();
int m_fd[2];
mutable pollfd m_fd_data[2];
static constexpr ssize_t MAX_BYTES = 1024 * 1024; // 1MB
};
pipe.cxx
#include "pipe.hxx"
#include <unistd.h>
Pipe::Pipe() {
init();
}
Pipe::~Pipe() {
close_read();
close_write();
}
void Pipe::bind_read(int dest) {
bind(m_fd[0], dest);
}
void Pipe::bind_write(int dest) {
bind(m_fd[1], dest);
}
void Pipe::close_read() {
close(m_fd[0]);
}
void Pipe::close_write() {
close(m_fd[1]);
}
int Pipe::poll(int timeout) const {
return ::poll(m_fd_data, 2, timeout);
}
bool Pipe::has_read_event(unsigned short event) const {
return (m_fd_data[0].revents & event) == event;
}
bool Pipe::has_write_event(unsigned short event) const {
return (m_fd_data[1].revents & event) == event;
}
Pipe& Pipe::operator<<(const std::string& data) {
write(data);
return *this;
}
std::optional<std::string>& Pipe::operator>>(std::optional<std::string>& out) const {
std::optional<std::string> data = read();
if (data) {
if (out)
out = *out + *data;
else
out = *data;
}
return out;
}
void Pipe::write(const std::string& str) {
bool retry = true;
do {
poll(-1);
if (has_write_event(POLLHUP)) {
retry = false;
}
else if (has_write_event(POLLOUT)) {
::write(m_fd[1], str.c_str(), sizeof(char) * str.length());
retry = false;
}
} while (retry);
}
std::optional<std::string> Pipe::read() const {
std::optional<std::string> result;
bool retry = true;
do {
poll(-1);
ssize_t bytes;
if (has_read_event(POLLIN)) {
char buffer[MAX_BYTES];
std::string data = "";
while ((bytes = ::read(m_fd[0], buffer, MAX_BYTES)) > 0) {
data += std::string(buffer, bytes);
};
if (!data.empty()) {
result = std::move(data);
}
retry = false;
}
else if (has_read_event(POLLHUP)) {
retry = false;
}
} while(retry);
return result;
}
void Pipe::bind(int& src, int dest) {
dup2(src, dest);
close(src); // Here maybe use ::close instead
}
void Pipe::close(int& fd) {
::close(fd);
}
void Pipe::init() {
pipe(m_fd);
m_fd_data[0].fd = m_fd[0];
m_fd_data[0].events = POLLIN;
m_fd_data[0].revents = 0;
m_fd_data[1].fd = m_fd[1];
m_fd_data[1].events = POLLOUT;
m_fd_data[1].revents = 0;
}
executable.hxx
#pragma once
#include "pipe.hxx"
#include <iostream>
#include <optional>
#include <string>
#include <thread>
#include <unistd.h>
#include <vector>
class Executable {
public:
Executable(const std::string& prog, const std::vector<std::string>& args = std::vector<std::string>());
Executable(std::string&&, std::vector<std::string>&&);
Executable(const Executable&) = delete;
Executable(Executable&&) = default;
Executable& operator=(const Executable&) = delete;
Executable& operator=(Executable&&) = default;
~Executable() = default;
int wait();
struct _EoF {};
static constexpr _EoF EoF = {};
Executable& operator>>(Executable&);
std::optional<std::string>& operator>>(std::optional<std::string>&);
friend std::ostream& operator<<(std::ostream&, const Executable&);
Executable& operator<<(const std::string&);
void operator<<(const _EoF&);
public:
void write(const std::string&);
void run();
void consume_and_forward(Executable&);
std::string m_program;
std::vector<std::string> m_arguments;
pid_t m_pid;
Pipe m_pstdout, m_pstdin, m_pstderr;
std::unique_ptr<std::thread> m_forwarder;
static constexpr ssize_t BUFFER_SIZE = 1024 * 1024; // 1MiB
};
executable.cxx
#include "executable.hxx"
#include <filesystem>
#include <sys/wait.h>
Executable::Executable(const std::string& prog, const std::vector<std::string>& args):m_program(prog), m_arguments(args) {
run();
}
Executable::Executable(std::string&& prog, std::vector<std::string>&& args):m_program(std::move(prog)), m_arguments(std::move(args)) {
run();
}
Executable& Executable::operator>>(Executable& exe) {
// Do something here
m_forwarder.reset(new std::thread(&Executable::consume_and_forward, this, std::ref(exe)));
return exe;
}
std::optional<std::string>& Executable::operator>>(std::optional<std::string>& data) {
m_pstdout >> data;
return data;
}
std::ostream& operator<<(std::ostream& os, const Executable& exe) {
std::optional<std::string> data;
exe.m_pstdout >> data;
if (data) os << *data;
return os;
}
Executable& Executable::operator<<(const std::string& data) {
m_pstdin << data;
return *this;
}
void Executable::operator<<(const _EoF&) {
m_pstdin.close_write();
}
void Executable::run() {
m_pid = fork();
if (m_pid == 0) {
/* STDIN: Child reads from STDIN but does not write to */
m_pstdin.close_write();
m_pstdin.bind_read(STDIN_FILENO);
/* STDOUT: Child writes to STDOUT but does not read from */
m_pstdout.close_read();
m_pstdout.bind_write(STDOUT_FILENO);
/* STDERR: Child writes to STDERR but does not read from */
m_pstdout.close_read();
m_pstdout.bind_write(STDERR_FILENO);
std::string program_file = std::filesystem::path(m_program).filename().string();
std::vector<char*> argv;
argv.reserve(m_arguments.size() + 2);
argv.push_back(program_file.data());
for (size_t i = 0; i < m_arguments.size(); i++)
argv.push_back(m_arguments[i].data());
argv.push_back(NULL);
execvp(m_program.data(), argv.data());
// If we reach here then we failed to execute the program
exit(0);
}
else {
/* STDIN: Parent writes to STDIN but does not read from */
m_pstdin.close_read();
/* STDOUT: Parent reads from to STDOUT but does not write to */
m_pstdout.close_write();
/* STDERR: Parent reads from to STDERR but does not write to */
m_pstderr.close_write();
}
}
void Executable::write(const std::string& str) {
m_pstdin << str;
}
int Executable::wait() {
int status;
if (m_forwarder)
m_forwarder->join();
waitpid(m_pid, &status, 0);
return status;
}
void Executable::consume_and_forward(Executable& exec) {
do {
std::optional<std::string> buffer;
m_pstdout.poll(-1);
m_pstdout >> buffer;
if (buffer)
exec.m_pstdin << *buffer;
} while (!m_pstdout.has_read_event(POLLHUP));
exec.m_pstdin.close_write();
}
Upvotes: 1
Views: 63