Brandon Ross Pollack
Brandon Ross Pollack

Reputation: 707

Piping into a child process into another child process

I am trying to create two pipes, the input of the first one is the contents of the input file in argv[1] of the parent process line by line, piped into a mapper process which does some work, and then finally into a reducer process which reduces it.

When I run my mapper and reducer like this in `bash:

./mapper < input.txt | reducer

It works perfect, but the following program outputs nothing and hangs on the wait(NULL);

My Code

#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>

void checkForkError(pid_t pid);
void mapperSetup(int mapperPipe[]);
void reducerSetup(int reducerPipe[]);

int main(int argc, char* argv[]) {
    if(argc < 2) {
        printf("please specify an input file\n");
        exit(1);
    }
    int mapperPipe[2]; //last index write end, first index read end

    if (pipe(mapperPipe) == -1) {
           perror("error piping");
           exit(EXIT_FAILURE);
    }

    pid_t firstChild = fork();

    checkForkError(firstChild);

    if(firstChild == 0) { //child
        mapperSetup(mapperPipe);
    }
    else {
        close(mapperPipe[0]);
        close(STDOUT_FILENO);
        dup(mapperPipe[1]);
        FILE* in = fopen(argv[1], "r");
        if(in == NULL) {
            perror("error opening file");
            exit(EXIT_FAILURE);
        }
        ssize_t read;
        size_t n;
        char* line = NULL;
        while(read = getline(&line, &n, in) != -1) {
            write(STDOUT_FILENO, line, n);
        }
        close(STDOUT_FILENO);
        free(line);
        wait(NULL);
    }
}

void inline checkForkError(pid_t pid) {
    if(pid < 0) {
        perror("error forking!!!");
    }
}

void mapperSetup(int mapperPipe[]) {
    int reducerPipe[2];

    if(pipe(reducerPipe) == -1) {
        perror("error piping");
        exit(EXIT_FAILURE);
    }

    pid_t secondChild = fork();

    checkForkError(secondChild);
    if(secondChild == 0) { //reducer process
        reducerSetup(reducerPipe);
    }
    else { //mapper process
        close(mapperPipe[1]); //close write end
        close(STDIN_FILENO); //close stdin
        dup(mapperPipe[0]); //dup pipe out to stdin

        close(reducerPipe[0]); //close read end
        close(STDOUT_FILENO); //close stdout
        dup(reducerPipe[1]); //dup output to reducer pipe

        if(execv("mapper", (char *[]){"mapper", NULL}) == -1) {
            perror("exec error");
            exit(EXIT_FAILURE);
        }
    }
}

void reducerSetup(int reducerPipe[]) {
    close(reducerPipe[1]); //close write end of second pipe
    close(STDIN_FILENO); //close stdin
    dup(reducerPipe[0]); //dup read end of pipe to stdin

    if(execv("reducer", (char *[]){"reducer", NULL}) != -1) {
        perror("exec error");
        exit(EXIT_FAILURE);
    }
}

Upvotes: 1

Views: 144

Answers (1)

Brandon Ross Pollack
Brandon Ross Pollack

Reputation: 707

The problem is that when you have multiple fd's after a dup you must close the original as well as the new dup when you're done for the EOF to be sent.

In short, the FD references count is incremented for a dup.

The other problem is that my process tree is linear, not two children of one process, so the main process exits before the output, causing bash to have outputs after the execution seems to finish, making it look like it was hanging when it wasnt.

The solution is to create both pipes and forks from the parent process with a little restructuring.

Special thanks to Russell Reed who helped me out.

Upvotes: 1

Related Questions