Kris
Kris

Reputation: 59

Signal propagation not working between child processes in multi-process C program

I'm working on a C program that creates three processes to read and process a file:

Process 1 reads the file line by line
Process 2 counts the lines
Process 3 (main process) maintains the final count

The processes communicate through pipes and should also respond to signals (SIGUSR1, SIGUSR2, SIGCONT, SIGINT) that need to be propagated between them. When one process receives a signal, it should propagate it to the other processes. The signals are setup like this:

signal(SIGUSR1, handle_s1);  // Termination <br>
signal(SIGUSR2, handle_s2);  // Suspension <br>
signal(SIGCONT, handle_s3);  // Resume <br>
signal(SIGINT, handle_s4);   // Message handling

When a signal is sent to any process (except parent), it doesn't properly propagate to the other processes. The child processes (pid1 and pid2) don't seem to handle the signals correctly, they only perform the signal action on themselves.

What am I doing wrong in my signal propagation implementation? How can I ensure that signals are properly propagated between all three processes?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <signal.h>
#include <time.h>

#define BUFFER_SIZE 1024
#define FIFO_1 "/tmp/fifo_1"
#define FIFO_2 "/tmp/fifo_2"
#define FIFO_3 "/tmp/fifo_3"

typedef struct {
    int signal_type;    
    pid_t sender_pid;   
    char message[256];  
    int line_count;     
} Message;

volatile sig_atomic_t is_suspended = 0;
volatile sig_atomic_t should_exit = 0;
time_t start_time;

pid_t pid1, pid2, pid3;

void propagate_signal(int sig, pid_t sender_pid) {
    if (sender_pid == pid1) {
        printf("Propagating signal from Process 1 (PID: %d) to Process 2 and Process 3\n", sender_pid);
        kill(pid2, sig);
        kill(pid3, sig);
    } else if (sender_pid == pid2) {
        printf("Propagating signal from Process 2 (PID: %d) to Process 1 and Process 3\n", sender_pid);
        kill(pid1, sig);
        kill(pid3, sig);
    } else if (sender_pid == pid3) {
        printf("Propagating signal from Process 3 (PID: %d) to Process 1 and Process 2\n", sender_pid);
        kill(pid1, sig);
        kill(pid2, sig);
    }
}

void handle_s1(int sig) {
    should_exit = 1;
    printf("Process %d: Received termination signal (S1)\n", getpid());
    propagate_signal(SIGUSR1, getpid());
}

void handle_s2(int sig) {
    is_suspended = 1;
    printf("Process %d: Received suspension signal (S2)\n", getpid());
    propagate_signal(SIGUSR2, getpid());
}

void handle_s3(int sig) {
    is_suspended = 0;
    printf("Process %d: Received resume signal (S3)\n", getpid());
    propagate_signal(SIGCONT, getpid());
}

void handle_s4(int sig) {
    Message msg;
    char fifo_path[20];
    sprintf(fifo_path, "/tmp/fifo_%d", getpid());
    
    int fifo_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
    if (fifo_fd >= 0) {
        if (read(fifo_fd, &msg, sizeof(msg)) > 0) {
            printf("Process %d: Received message from process %d: %s\n", 
                   getpid(), msg.sender_pid, msg.message);
            propagate_signal(SIGINT, getpid());
            if (msg.line_count > 0) {
                printf("Line count information received: %d\n", msg.line_count);
            }
        }
        close(fifo_fd);
    }
}

void send_message_to_others(int signal_type, const char* msg, int line_count) {
    Message message;
    message.signal_type = signal_type;
    message.sender_pid = getpid();
    message.line_count = line_count;
    strncpy(message.message, msg, 255);
    
    for (int i = 1; i <= 3; i++) {
        char fifo_path[20];
        sprintf(fifo_path, "/tmp/fifo_%d", i);
        int fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
        if (fd >= 0) {
            write(fd, &message, sizeof(message));
            close(fd);
        }
    }
}

int main(int argc, char *argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: %s <file_path>\n", argv[0]);
        exit(1);
    }

    mkfifo(FIFO_1, 0666);
    mkfifo(FIFO_2, 0666);
    mkfifo(FIFO_3, 0666);

    signal(SIGUSR1, handle_s1);
    signal(SIGUSR2, handle_s2);
    signal(SIGCONT, handle_s3);
    signal(SIGINT, handle_s4);

    int pipe1[2], pipe2[2];
    start_time = time(NULL);

    if (pipe(pipe1) == -1 || pipe(pipe2) == -1) {
        perror("pipe");
        exit(1);
    }

    pid1 = fork();
    if (pid1 == 0) { 
        pid_t pid = getpid();
        printf("Process 1 (PID: %d) started\n", pid);
        close(pipe1[0]);
        close(pipe2[0]);
        close(pipe2[1]);

        FILE *file = fopen(argv[1], "r");
        if (file == NULL) {
            perror("fopen");
            exit(1);
        }

        char buffer[BUFFER_SIZE];
        size_t bytes_read;
        int line_number = 0;

        while (fgets(buffer, BUFFER_SIZE, file) != NULL && !should_exit) {
            while(is_suspended) {
                usleep(100000); 
            }
            bytes_read = strlen(buffer);
            if (write(pipe1[1], buffer, bytes_read) != bytes_read) {
                perror("write to pipe");
                break;
            }
            line_number++;
            printf("Process 1: Read line %d: %s", line_number, buffer);
            send_message_to_others(4, "Read new line", line_number);
            usleep(100000);
        }

        printf("Process 1: Finished reading file (read %d lines)\n", line_number);
        fclose(file);
        close(pipe1[1]);
        exit(0);
    }

    pid2 = fork();
    if (pid2 == 0) { 
        pid_t pid = getpid();
        printf("Process 2 (PID: %d) started\n", pid);
        close(pipe1[1]);
        close(pipe2[0]);

        char ch;
        int line_count = 0;
        ssize_t bytes_read;
        int last_reported_count = 0;

        while ((bytes_read = read(pipe1[0], &ch, 1)) > 0 && !should_exit) {
            while(is_suspended) {
                usleep(100000); 
            }
            if (ch == '\n') {
                line_count++;
                if (line_count != last_reported_count) {
                    printf("Process 2: Counted line %d\n", line_count);
                    write(pipe2[1], &line_count, sizeof(line_count));
                    send_message_to_others(4, "Counted line", line_count);
                    last_reported_count = line_count;
                }
            }
        }

        printf("Process 2: Finishing operation\n");
        close(pipe1[0]);
        close(pipe2[1]);
        exit(0);
    }

    pid3 = getpid();
    printf("Process 3 (main, PID: %d) started\n", pid3);
    close(pipe1[0]);
    close(pipe1[1]);
    close(pipe2[1]);

    int last_count = 0;
    int final_count;

    while (read(pipe2[0], &final_count, sizeof(final_count)) > 0 && !should_exit) {
        while(is_suspended) {
            usleep(100000); 
        }
        if (final_count != last_count) {
            printf("Main process: Updated line count: %d\n", final_count);
            send_message_to_others(4, "Updated line count", final_count);
            last_count = final_count;
        }
    }

    printf("Main process: Finishing operation. Final line count: %d\n", last_count);

    waitpid(pid1, NULL, 0);
    waitpid(pid2, NULL, 0);

    close(pipe2[0]);
    unlink(FIFO_1);
    unlink(FIFO_2);
    unlink(FIFO_3);

    return 0;
}

Upvotes: 3

Views: 85

Answers (1)

Craig Estey
Craig Estey

Reputation: 33631

You fork your processes in 1,2,3 order.

Remember that fork returns a pid to the parent and 0 to the child process.

Thus, for a given pid (e.g. pidX where X is 1, 2, or 3), it can only know about a lower number pid. For each process launched/forked, designated 1-3:

  1. can send to nobody else (pid1, pid2 and pid3 are 0).
  2. can send to pid1 (pid2 and pid3 will be 0).
  3. can send to pid2 and pid1 (pid3 will be zero).

With your code, I bet some pidX are 0. To verify this, have each child process do an immediate:

printf("%d: pid1=%d pid2=%d pid3=%d\n",getpid(),pid1,pid2,pid3);

So, for example, if you're trying to send from pid1 to pid2 and pid3, they will be 0. This means that when pid1 is doing kill(SIGTERM,pid2);, it is really doing kill(SIGTERM,0).

Note that when we do: kill(signo,0), we are sending the signal to all members of our process group. This includes our pid and the pid of our parent as well as any children our parent forks. In fact, normally, a child's process group is the pid of its parent.

So, with your current code, if pid1 has any effect on any of the other children (pid2, pid3), it's because it's doing kill(signo,0) serendipitously.

  • Method A: So, just having each process do a kill(signo,0) would probably be better. It means that the process would have to keep track of a few more things because it will send a signal to itself.

  • Method B: An alternative is if the parent's pid is (e.g.) parent_pid, have each child send just one signal to the parent (e.g.) kill(SIGTERM,parent_pid). Then, have the parent pid do: kill(SIGTERM,0)

To prevent "infinite" signal "thrashing", you may want to mask off signals so that only certain processes will receive certain signals. You can set up a per-process signal mask with sigprocmask

So, with method A, parent should block all signals. Then, the first thing a child does (after the fork) is unblock them. Then, process group kill should work as desired.

Side note: You can't use printf in a signal handler. See: man signal-safety


UPDATE:

Doing sprintf(fifo_path,"/tmp/fifo_%d",getpid()) in handle_s4 will not ever be /tmp/fifo_1 et. al.

I've added a safe version of printf.

And, a lot more error checking.

Note that when we get to pid3, because the code does not have: if (pid3 == 0) { ... } like the others, it will be executed twice. Once by the child and once by the parent. So, I put that code under such an if

And, some heavy rework. Using sigaction

It is still broken but it may give you some ideas. It still fails the open in send_message_to_others for the fifo.

These errors could be latent bugs by you (because the message couldn't have worked because of the bug in handle_s4). Or, they could be bugs I've created. I didn't have time to debug further.

Personally, I might replace the mkfifo et. al. with SysV IPC: msgget/msgsnd/msgrcv [as I've never had a problem with those]:

  1. Parent does msgget to create one queue
  2. The messages are differentiated by the msg_id (e.g. value of 1, 2, or 3) which denote the "destination".

Anyway, here's the code:

Edit: Because of UPDATE #3, to stay within SO space limits, I've had to redact this [early] version of the code in favor of the later version below.


UPDATE #2:

I misread your intention regarding pid3. That is, I didn't see that you were doing pid3 = getpid(); and thought you were doing pid3 = fork();. My bad ;-) After other fixes, your original code seems to work better.

I couldn't get the fifo code to work. It kept giving ENXIO errors in send_message_to_others. So, I changed the code to use SysV IPC (by default). If you'd like to see the original behavior using mkfifo, compile with -DUSE_FIFO

Now, the code at least runs to completion. I'm not totally sure about your full intent with this regarding all the cross-signalling, so I'll leave it at that.

So, here is the updated code:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#if 1
#include <errno.h>
#include <stdarg.h>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#endif

#ifndef USE_PIDM
#define USE_PIDM    1
#endif

#ifndef USE_FIFO
#define USE_FIFO    0
#endif

#define countof(_x)     (sizeof(_x) / sizeof(_x[0]))

#define GOTSIG(_who) \
    prtf(_who ": sig=%d si_pid=%d/%d\n", \
        sig,info->si_pid,info->si_pid - pidm)

#define BUFFER_SIZE 1024

#if USE_FIFO
#if 0
#define FIFO_1 "/tmp/fifo_1"
#define FIFO_2 "/tmp/fifo_2"
#define FIFO_3 "/tmp/fifo_3"
#else
const char *fifolist[4] = {
    NULL,
    "/tmp/fifo_1",
    "/tmp/fifo_2",
    "/tmp/fifo_3",
};
#endif
#else
int msg_fd;
#endif

typedef struct {
#if USE_FIFO == 0
    long msg_id;
#endif
    int signal_type;
    pid_t sender_pid;
    char message[256];
    int line_count;
} Message;

volatile sig_atomic_t is_suspended = 0;
volatile sig_atomic_t should_exit = 0;
time_t start_time;

#if 0
pid_t pid1, pid2, pid3;
#else
pid_t pidlist[4];
#define pidm    pidlist[0]
#define pid1    pidlist[1]
#define pid2    pidlist[2]
#define pid3    pidlist[3]
#endif

pid_t pidself;                  // cached value of getpid()
int xidself;                    // 0-3
int xidnew;                     // 0-3

#define prtattr(_lvl) \
    __attribute__((__format__(__printf__,_lvl,_lvl + 1)))

double tsczero;

double
tscgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tsczero;

    return sec;
}

#define prtfault(_fmt...) \
    do { \
        prtf(_fmt); \
        exit(1); \
    } while (0)

// prtf -- signal safe printf
void prtattr(1)
prtf(const char *fmt,...)
{
    va_list ap;
    char buf[1000];
    char *bp = buf;

    // printf uses the heap but sprintf doesn't [99.44% of the time ;-)]

    // show which pid said what
    bp += sprintf(bp,"[%.9f X:%d P:%d] ",tscgetf(),xidself,pidself);

    va_start(ap,fmt);
    bp += vsprintf(bp,fmt,ap);
    va_end(ap);

    flock(1,LOCK_EX);
    write(1,buf,bp - buf);
    flock(1,LOCK_UN);
}

void
propagate_signal(int sig, pid_t sender_pid)
{
#if 0
    if (sender_pid == pid1) {
        prtf("Propagating signal from Process 1 (PID: %d) to Process 2 and Process 3\n", sender_pid);
        kill(pid2, sig);
        kill(pid3, sig);
    }
    else if (sender_pid == pid2) {
        prtf("Propagating signal from Process 2 (PID: %d) to Process 1 and Process 3\n", sender_pid);
        kill(pid1, sig);
        kill(pid3, sig);
    }
    else if (sender_pid == pid3) {
        prtf("Propagating signal from Process 3 (PID: %d) to Process 1 and Process 2\n", sender_pid);
        kill(pid1, sig);
        kill(pid2, sig);
    }
#endif

#if 0
    kill(0,sig);
#endif

#if 1
    prtf("propagate_signal: sig=%d\n",sig);
    union sigval sigval;
    sigval.sival_int = 0;
    sigqueue(pidm,SIGHUP,sigval);
#endif
}

void
handle_s1(int sig,siginfo_t *info,void *vp)
{
    GOTSIG("handle_s1");
    should_exit = 1;
    prtf("Received termination signal (S1)\n");
    propagate_signal(SIGUSR1, pidself);
}

void
handle_s2(int sig,siginfo_t *info,void *vp)
{
    GOTSIG("handle_s2");
    is_suspended = 1;
    prtf("Received suspension signal (S2)\n");;
#if 0
    propagate_signal(SIGUSR2, pidself);
#else
    kill(SIGUSR2,pidself);
#endif
}

void
handle_s3(int sig,siginfo_t *info,void *vp)
{
    GOTSIG("handle_s3");
    is_suspended = 0;
    prtf("Received resume signal (S3)\n");
#if 0
    propagate_signal(SIGCONT, pidself);
#else
    kill(SIGCONT,pidself);
#endif
}

void
handle_s4(int sig,siginfo_t *info,void *vp)
{
    Message msg;

    GOTSIG("handle_s4");
#if USE_FIFO
// NOTE/BUG: this won't be fifo_1, fifo_2, fifo_3
#if 0
    char fifo_path[20];
    sprintf(fifo_path, "/tmp/fifo_%d", getpid());
#endif
#if 0
    const char *fifo_path = fifolist[xidself];
#else
    const char *fifo_path = fifolist[xidself ? xidself : 3];
#endif

#if USE_FIFO >= 2
    prtf("handle_s4: open fifo_path='%s'\n",fifo_path);
    int fifo_fd = open(fifo_path, O_RDONLY);
#else
    int fifo_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
#endif

    if (fifo_fd >= 0) {
        prtf("handle_s4: read ...\n");
        if (read(fifo_fd, &msg, sizeof(msg)) > 0) {
            prtf("Received message from process %d: %s\n",
                msg.sender_pid, msg.message);
            propagate_signal(SIGINT, pidself);
            if (msg.line_count > 0) {
                prtf("Line count information received: %d\n", msg.line_count);
            }
        }

        close(fifo_fd);
    }
    else
        prtfault("handle_s4: xidself=%d fifo_path='%s' open failed -- %s\n",
            xidself,fifo_path,strerror(errno));
#else
    prtf("handle_s4: msgrcv xidself=%d ...\n",xidself);
    if (msgrcv(msg_fd, &msg, sizeof(msg), xidself, 0) > 0) {
        prtf("Received message from process %d: %s\n",
            msg.sender_pid, msg.message);
        propagate_signal(SIGINT, pidself);
        if (msg.line_count > 0) {
            prtf("Line count information received: %d\n", msg.line_count);
        }
    }
#endif
}

void
handle_hup(int sig,siginfo_t *info,void *vp)
{

    GOTSIG("handle_hup");

    for (int idx = 0;  idx < countof(pidlist);  ++idx) {
        pid_t pidcur = pidlist[idx];

        // don't send to self
        if (pidcur == pidself)
            continue;

        // don't send to originator
        if (pidcur == info->si_pid)
            continue;

        prtf("handle_hup: sending to idx=%d pidcur=%d\n",idx,pidcur);
        kill(pidcur,sig);
    }
}

void
send_message_to_others(int signal_type, const char *msg, int line_count)
{
    Message message;

    message.signal_type = signal_type;
    message.sender_pid = getpid();
    message.line_count = line_count;
    strncpy(message.message, msg, 255);

#if USE_FIFO
#if 0
    char fifo_path[20];
#else
    const char *fifo_path;
#endif

    for (int i = 1; i <= 3; i++) {
        fifo_path = fifolist[i];
        prtf("send_message_to_others: open fifo_path='%s'\n",fifo_path);
#if USE_FIFO >= 2
        int fd = open(fifo_path, O_WRONLY);
#else
        int fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
#endif

        if (fd >= 0) {
            int xlen = write(fd, &message, sizeof(message));
            prtf("send_message_to_others: i=%d xlen=%d sizeof=%zu\n",
                i,xlen,sizeof(message));
            close(fd);
        }
        else
            prtfault("send_message_to_others: i=%d fifo_path='%s' open failed -- %s\n",
                i,fifo_path,strerror(errno));
    }
#else
    for (int i = 1; i <= 3; i++) {
        message.msg_id = i;
        int xlen = msgsnd(msg_fd, &message, sizeof(message), 0);
        prtf("send_message_to_others: i=%d xlen=%d sizeof=%zu\n",
            i,xlen,sizeof(message));
    }
#endif
}

void
setup_handler(int signo,void (*handler)(int,siginfo_t *,void *))
{
    struct sigaction act;

    memset(&act,0,sizeof(act));
    act.sa_sigaction = handler;
    sigaction(signo,&act,NULL);
}

void
start_child(void)
{

    xidself = xidnew;
    pidself = getpid();

    prtf("start_child: started\n");
}

int
main(int argc, char *argv[])
{

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <file_path>\n", argv[0]);
        exit(1);
    }

    tsczero = tscgetf();

    // master/parent pid
    pidm = getpid();
    pidself = pidm;
    prtf("main: pidm=%d\n",pidm);

#if USE_FIFO
    for (int idx = 1;  idx < countof(fifolist);  ++idx) {
        const char *file = fifolist[idx];
        unlink(file);
        int err = mkfifo(file, 0666);

        if (err < 0)
            prtfault("mkfifo: %s -- %s\n",file,strerror(errno));
    }
#else
    msg_fd = msgget(IPC_PRIVATE,IPC_CREAT | 0600);
    if (msg_fd < 0)
        prtfault("main: msgget fail -- %s\n",strerror(errno));
#endif

#if 0
    signal(SIGUSR1, handle_s1);
    signal(SIGUSR2, handle_s2);
    signal(SIGCONT, handle_s3);
    signal(SIGINT, handle_s4);
#else
    setup_handler(SIGUSR1, handle_s1);
    setup_handler(SIGUSR2, handle_s2);
    setup_handler(SIGCONT, handle_s3);
    setup_handler(SIGINT, handle_s4);
    setup_handler(SIGHUP, handle_hup);
#endif

    int pipe1[2], pipe2[2];

    start_time = time(NULL);

    if (pipe(pipe1) == -1 || pipe(pipe2) == -1) {
        perror("pipe");
        exit(1);
    }

    ++xidnew;
    pid1 = fork();

    if (pid1 == 0) {
        start_child();

        close(pipe1[0]);
        close(pipe2[0]);
        close(pipe2[1]);

        FILE *file = fopen(argv[1], "r");
        if (file == NULL) {
            perror("fopen");
            exit(1);
        }

        char buffer[BUFFER_SIZE];
        size_t bytes_read;
        int line_number = 0;

        while (fgets(buffer, BUFFER_SIZE, file) != NULL && !should_exit) {
            while (is_suspended) {
                usleep(100000);
            }
            bytes_read = strlen(buffer);
            if (write(pipe1[1], buffer, bytes_read) != bytes_read) {
                perror("write to pipe");
                break;
            }
            line_number++;
            prtf("Read line %d: %s", line_number, buffer);
            send_message_to_others(4, "Read new line", line_number);
            usleep(100000);
        }

        prtf("Finished reading file (read %d lines)\n", line_number);
        fclose(file);
        close(pipe1[1]);
        exit(0);
    }

    ++xidnew;
    pid2 = fork();

    if (pid2 == 0) {
        start_child();

        close(pipe1[1]);
        close(pipe2[0]);

        char ch;
        int line_count = 0;
        ssize_t bytes_read;
        int last_reported_count = 0;

        while ((bytes_read = read(pipe1[0], &ch, 1)) > 0 && !should_exit) {
            while (is_suspended) {
                usleep(100000);
            }
            if (ch == '\n') {
                line_count++;
                if (line_count != last_reported_count) {
                    prtf("Counted line %d\n", line_count);
                    write(pipe2[1], &line_count, sizeof(line_count));
                    send_message_to_others(4, "Counted line", line_count);
                    last_reported_count = line_count;
                }
            }
        }

        prtf("Finishing operation\n");
        close(pipe1[0]);
        close(pipe2[1]);
        exit(0);
    }

#if USE_PIDM
    pid3 = pidm;
    if (1) {
        prtf("main: starting pid3 as parent\n");
#else
    ++xidnew;
    pid3 = fork();
    if (pid3 == 0) {
        start_child();
#endif

        close(pipe1[0]);
        close(pipe1[1]);
        close(pipe2[1]);

        int last_count = 0;
        int final_count;

        while (read(pipe2[0], &final_count, sizeof(final_count)) > 0 &&
            !should_exit) {
            while (is_suspended) {
                usleep(100000);
            }
            if (final_count != last_count) {
                prtf("Updated line count: %d\n", final_count);
                send_message_to_others(4, "Updated line count", final_count);
                last_count = final_count;
            }
        }

        prtf("Finishing operation. Final line count: %d\n", last_count);
#if USE_PIDM == 0
        exit(0);
#endif
    }

#if 0
    waitpid(pid1, NULL, 0);
    waitpid(pid2, NULL, 0);
#else
    while (1) {
        prtf("Waiting\n");
        pid_t pidreap = wait(NULL);
        if (pidreap < 0)
            break;
        prtf("Reaping %d/%d\n",pidreap,pidreap - pidm);
    }
#endif

    close(pipe2[0]);
#if USE_FIFO
    for (int idx = 1;  idx < countof(fifolist);  ++idx)
        unlink(fifolist[idx]);
#endif

    return 0;
}

In the code above, I've used cpp conditionals to denote old vs. new code:

#if 0
// old code
#else
// new code
#endif

#if 1
// new code
#endif

Note: this can be cleaned up by running the file through unifdef -k

For this particular code: unifdef -k -iUUSE_FIFO -iDUSE_PIDM, could get a [fully] cleaned up version.


Here is the input file I used:

hello world
goodbye galaxy
universe eternal

Here is the program output when compiled with -DUSE_FIFO:

[0.000004085 X:0 P:595644] main: pidm=595644
[0.000346837 X:1 P:595645] start_child: started
[0.000381570 X:0 P:595644] main: starting pid3 as parent
[0.000421851 X:2 P:595646] start_child: started
[0.000479973 X:1 P:595645] Read line 1: hello world
[0.000487642 X:2 P:595646] Counted line 1
[0.000503542 X:1 P:595645] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address
[0.000517247 X:2 P:595646] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address
[0.000538922 X:0 P:595644] Updated line count: 1
[0.000566704 X:0 P:595644] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address

Here is the program output using SysV IPC:

[0.000001998 X:0 P:595688] main: pidm=595688
[0.000619131 X:0 P:595688] main: starting pid3 as parent
[0.000642203 X:1 P:595689] start_child: started
[0.000678053 X:2 P:595690] start_child: started
[0.000774182 X:1 P:595689] Read line 1: hello world
[0.000813729 X:2 P:595690] Counted line 1
[0.000879997 X:0 P:595688] Updated line count: 1
[0.001350095 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001352684 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001355468 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001363248 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001364265 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001365704 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001370134 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.001370989 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.001372129 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101520156 X:1 P:595689] Read line 2: goodbye galaxy
[0.101565808 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101581843 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101597385 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101611671 X:2 P:595690] Counted line 2
[0.101636212 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101641843 X:0 P:595688] Updated line count: 2
[0.101649104 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101658820 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101660533 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101669473 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101678881 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201782499 X:1 P:595689] Read line 3: universe eternal
[0.201823202 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.201838128 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.201846063 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201873574 X:2 P:595690] Counted line 3
[0.201902347 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.201915274 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.201922945 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201969388 X:0 P:595688] Updated line count: 3
[0.201994417 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.202010472 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.202022218 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.301994886 X:1 P:595689] Finished reading file (read 3 lines)
[0.302076881 X:2 P:595690] Finishing operation
[0.302118434 X:0 P:595688] Finishing operation. Final line count: 3
[0.302132616 X:0 P:595688] Waiting
[0.302194757 X:0 P:595688] Reaping 595689/1
[0.302204503 X:0 P:595688] Waiting
[0.302265047 X:0 P:595688] Reaping 595690/2
[0.302272118 X:0 P:595688] Waiting

UPDATE #3:

"man 7 fifo" explain your fifo error. – stevea

It's really OP's error (which he couldn't get to because of the sprintf(fifo_path,"/tmp/fifo_%d",getpid()) issue I mentioned previously).

From man 7 fifo:

A process can open a FIFO in nonblocking mode. In this case, opening for read only will succeed even if no-one has opened on the write side yet, opening for write only will fail with ENXIO (no such device or address) unless the other end has already been opened.

So, I changed the code above to do non-blocking open (without O_NONBLOCK) if USE_FIFO >= 2.

It fixes the ENXIO error but doesn't solve the overall problem.

Now, unfortunately, send_message_to_others just hangs on the open:

[0.000004437 X:0 P:656979] main: pidm=656979
[0.000427016 X:0 P:656979] main: starting pid3 as parent
[0.000458723 X:1 P:656980] start_child: started
[0.000517052 X:2 P:656981] start_child: started
[0.000606265 X:1 P:656980] Read line 1: hello world
[0.000614574 X:1 P:656980] send_message_to_others: open fifo_path='/tmp/fifo_1'
[0.000637244 X:2 P:656981] Counted line 1
[0.000663545 X:2 P:656981] send_message_to_others: open fifo_path='/tmp/fifo_1'
[0.000667690 X:0 P:656979] Updated line count: 1
[0.000681505 X:0 P:656979] send_message_to_others: open fifo_path='/tmp/fifo_1'

Side note: I've done many such systems over the years. And, [as I mentioned] I've always used SysV IPC. And, OP's usage of pipes/fifos is for short messages (vs. sending lots of data in a stream). So, IMO, SysV IPC is better suited to the problem and it is working [with less effort ;-)].

The overall issue is a race/sequencing bug in the program logic. The program tries to send a message to a process before the process has set itself up to receive it. So, when trying to write, we either get ENXIO or a hang.

SysV IPC doesn't [seem to] have this issue:

  1. msgsnd will queue the message (and not be blocked).
  2. The message will remain in the kernel buffers (pending).
  3. When the target process finally does a msgrcv, it will get the message.

The real problem is that the target process will only try to receive a message from the SIGINT signal handler. So, (with pipes) the sender gets blocked before it is able to send the signal to the receiver.

With SysV IPC, this use of signals really isn't required. And, the program would be more robust without them (IMO).

Upvotes: 2

Related Questions