Reputation: 59
I'm working on a C program that creates three processes to read and process a file:
Process 1 reads the file line by line
Process 2 counts the lines
Process 3 (main process) maintains the final count
The processes communicate through pipes and should also respond to signals (SIGUSR1, SIGUSR2, SIGCONT, SIGINT) that need to be propagated between them. When one process receives a signal, it should propagate it to the other processes. The signals are setup like this:
signal(SIGUSR1, handle_s1); // Termination <br>
signal(SIGUSR2, handle_s2); // Suspension <br>
signal(SIGCONT, handle_s3); // Resume <br>
signal(SIGINT, handle_s4); // Message handling
When a signal is sent to any process (except parent), it doesn't properly propagate to the other processes. The child processes (pid1 and pid2) don't seem to handle the signals correctly, they only perform the signal action on themselves.
What am I doing wrong in my signal propagation implementation? How can I ensure that signals are properly propagated between all three processes?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#define BUFFER_SIZE 1024
#define FIFO_1 "/tmp/fifo_1"
#define FIFO_2 "/tmp/fifo_2"
#define FIFO_3 "/tmp/fifo_3"
typedef struct {
int signal_type;
pid_t sender_pid;
char message[256];
int line_count;
} Message;
volatile sig_atomic_t is_suspended = 0;
volatile sig_atomic_t should_exit = 0;
time_t start_time;
pid_t pid1, pid2, pid3;
void propagate_signal(int sig, pid_t sender_pid) {
if (sender_pid == pid1) {
printf("Propagating signal from Process 1 (PID: %d) to Process 2 and Process 3\n", sender_pid);
kill(pid2, sig);
kill(pid3, sig);
} else if (sender_pid == pid2) {
printf("Propagating signal from Process 2 (PID: %d) to Process 1 and Process 3\n", sender_pid);
kill(pid1, sig);
kill(pid3, sig);
} else if (sender_pid == pid3) {
printf("Propagating signal from Process 3 (PID: %d) to Process 1 and Process 2\n", sender_pid);
kill(pid1, sig);
kill(pid2, sig);
}
}
void handle_s1(int sig) {
should_exit = 1;
printf("Process %d: Received termination signal (S1)\n", getpid());
propagate_signal(SIGUSR1, getpid());
}
void handle_s2(int sig) {
is_suspended = 1;
printf("Process %d: Received suspension signal (S2)\n", getpid());
propagate_signal(SIGUSR2, getpid());
}
void handle_s3(int sig) {
is_suspended = 0;
printf("Process %d: Received resume signal (S3)\n", getpid());
propagate_signal(SIGCONT, getpid());
}
void handle_s4(int sig) {
Message msg;
char fifo_path[20];
sprintf(fifo_path, "/tmp/fifo_%d", getpid());
int fifo_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
if (fifo_fd >= 0) {
if (read(fifo_fd, &msg, sizeof(msg)) > 0) {
printf("Process %d: Received message from process %d: %s\n",
getpid(), msg.sender_pid, msg.message);
propagate_signal(SIGINT, getpid());
if (msg.line_count > 0) {
printf("Line count information received: %d\n", msg.line_count);
}
}
close(fifo_fd);
}
}
void send_message_to_others(int signal_type, const char* msg, int line_count) {
Message message;
message.signal_type = signal_type;
message.sender_pid = getpid();
message.line_count = line_count;
strncpy(message.message, msg, 255);
for (int i = 1; i <= 3; i++) {
char fifo_path[20];
sprintf(fifo_path, "/tmp/fifo_%d", i);
int fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
if (fd >= 0) {
write(fd, &message, sizeof(message));
close(fd);
}
}
}
int main(int argc, char *argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: %s <file_path>\n", argv[0]);
exit(1);
}
mkfifo(FIFO_1, 0666);
mkfifo(FIFO_2, 0666);
mkfifo(FIFO_3, 0666);
signal(SIGUSR1, handle_s1);
signal(SIGUSR2, handle_s2);
signal(SIGCONT, handle_s3);
signal(SIGINT, handle_s4);
int pipe1[2], pipe2[2];
start_time = time(NULL);
if (pipe(pipe1) == -1 || pipe(pipe2) == -1) {
perror("pipe");
exit(1);
}
pid1 = fork();
if (pid1 == 0) {
pid_t pid = getpid();
printf("Process 1 (PID: %d) started\n", pid);
close(pipe1[0]);
close(pipe2[0]);
close(pipe2[1]);
FILE *file = fopen(argv[1], "r");
if (file == NULL) {
perror("fopen");
exit(1);
}
char buffer[BUFFER_SIZE];
size_t bytes_read;
int line_number = 0;
while (fgets(buffer, BUFFER_SIZE, file) != NULL && !should_exit) {
while(is_suspended) {
usleep(100000);
}
bytes_read = strlen(buffer);
if (write(pipe1[1], buffer, bytes_read) != bytes_read) {
perror("write to pipe");
break;
}
line_number++;
printf("Process 1: Read line %d: %s", line_number, buffer);
send_message_to_others(4, "Read new line", line_number);
usleep(100000);
}
printf("Process 1: Finished reading file (read %d lines)\n", line_number);
fclose(file);
close(pipe1[1]);
exit(0);
}
pid2 = fork();
if (pid2 == 0) {
pid_t pid = getpid();
printf("Process 2 (PID: %d) started\n", pid);
close(pipe1[1]);
close(pipe2[0]);
char ch;
int line_count = 0;
ssize_t bytes_read;
int last_reported_count = 0;
while ((bytes_read = read(pipe1[0], &ch, 1)) > 0 && !should_exit) {
while(is_suspended) {
usleep(100000);
}
if (ch == '\n') {
line_count++;
if (line_count != last_reported_count) {
printf("Process 2: Counted line %d\n", line_count);
write(pipe2[1], &line_count, sizeof(line_count));
send_message_to_others(4, "Counted line", line_count);
last_reported_count = line_count;
}
}
}
printf("Process 2: Finishing operation\n");
close(pipe1[0]);
close(pipe2[1]);
exit(0);
}
pid3 = getpid();
printf("Process 3 (main, PID: %d) started\n", pid3);
close(pipe1[0]);
close(pipe1[1]);
close(pipe2[1]);
int last_count = 0;
int final_count;
while (read(pipe2[0], &final_count, sizeof(final_count)) > 0 && !should_exit) {
while(is_suspended) {
usleep(100000);
}
if (final_count != last_count) {
printf("Main process: Updated line count: %d\n", final_count);
send_message_to_others(4, "Updated line count", final_count);
last_count = final_count;
}
}
printf("Main process: Finishing operation. Final line count: %d\n", last_count);
waitpid(pid1, NULL, 0);
waitpid(pid2, NULL, 0);
close(pipe2[0]);
unlink(FIFO_1);
unlink(FIFO_2);
unlink(FIFO_3);
return 0;
}
Upvotes: 3
Views: 85
Reputation: 33631
You fork
your processes in 1,2,3 order.
Remember that fork
returns a pid
to the parent and 0 to the child process.
Thus, for a given pid (e.g. pidX
where X
is 1, 2, or 3), it can only know about a lower number pid. For each process launched/forked, designated 1-3:
pid1
, pid2
and pid3
are 0).pid1
(pid2
and pid3
will be 0).pid2
and pid1
(pid3
will be zero).With your code, I bet some pidX
are 0. To verify this, have each child process do an immediate:
printf("%d: pid1=%d pid2=%d pid3=%d\n",getpid(),pid1,pid2,pid3);
So, for example, if you're trying to send from pid1
to pid2
and pid3
, they will be 0.
This means that when pid1
is doing kill(SIGTERM,pid2);
, it is really doing kill(SIGTERM,0)
.
Note that when we do: kill(signo,0)
, we are sending the signal to all members of our process group. This includes our pid and the pid of our parent as well as any children our parent forks. In fact, normally, a child's process group is the pid of its parent.
So, with your current code, if pid1
has any effect on any of the other children (pid2
, pid3
), it's because it's doing kill(signo,0)
serendipitously.
Method A: So, just having each process do a kill(signo,0)
would probably be better. It means that the process would have to keep track of a few more things because it will send a signal to itself.
Method B: An alternative is if the parent's pid is (e.g.) parent_pid
, have each child send just one signal to the parent (e.g.) kill(SIGTERM,parent_pid)
. Then, have the parent pid do: kill(SIGTERM,0)
To prevent "infinite" signal "thrashing", you may want to mask off signals so that only certain processes will receive certain signals. You can set up a per-process signal mask with sigprocmask
So, with method A, parent should block all signals. Then, the first thing a child does (after the fork
) is unblock them. Then, process group kill
should work as desired.
Side note: You can't use printf
in a signal handler. See: man signal-safety
UPDATE:
Doing sprintf(fifo_path,"/tmp/fifo_%d",getpid())
in handle_s4
will not ever be /tmp/fifo_1
et. al.
I've added a safe version of printf
.
And, a lot more error checking.
Note that when we get to pid3
, because the code does not have: if (pid3 == 0) { ... }
like the others, it will be executed twice. Once by the child and once by the parent. So, I put that code under such an if
And, some heavy rework. Using sigaction
It is still broken but it may give you some ideas. It still fails the open
in send_message_to_others
for the fifo.
These errors could be latent bugs by you (because the message couldn't have worked because of the bug in handle_s4
). Or, they could be bugs I've created. I didn't have time to debug further.
Personally, I might replace the mkfifo
et. al. with SysV IPC: msgget/msgsnd/msgrcv
[as I've never had a problem with those]:
msgget
to create one queuemsg_id
(e.g. value of 1, 2, or 3) which denote the "destination".Anyway, here's the code:
Edit: Because of UPDATE #3, to stay within SO space limits, I've had to redact this [early] version of the code in favor of the later version below.
UPDATE #2:
I misread your intention regarding pid3
. That is, I didn't see that you were doing pid3 = getpid();
and thought you were doing pid3 = fork();
. My bad ;-) After other fixes, your original code seems to work better.
I couldn't get the fifo code to work. It kept giving ENXIO
errors in send_message_to_others
.
So, I changed the code to use SysV IPC (by default). If you'd like to see the original behavior using mkfifo
, compile with -DUSE_FIFO
Now, the code at least runs to completion. I'm not totally sure about your full intent with this regarding all the cross-signalling, so I'll leave it at that.
So, here is the updated code:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#if 1
#include <errno.h>
#include <stdarg.h>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#endif
#ifndef USE_PIDM
#define USE_PIDM 1
#endif
#ifndef USE_FIFO
#define USE_FIFO 0
#endif
#define countof(_x) (sizeof(_x) / sizeof(_x[0]))
#define GOTSIG(_who) \
prtf(_who ": sig=%d si_pid=%d/%d\n", \
sig,info->si_pid,info->si_pid - pidm)
#define BUFFER_SIZE 1024
#if USE_FIFO
#if 0
#define FIFO_1 "/tmp/fifo_1"
#define FIFO_2 "/tmp/fifo_2"
#define FIFO_3 "/tmp/fifo_3"
#else
const char *fifolist[4] = {
NULL,
"/tmp/fifo_1",
"/tmp/fifo_2",
"/tmp/fifo_3",
};
#endif
#else
int msg_fd;
#endif
typedef struct {
#if USE_FIFO == 0
long msg_id;
#endif
int signal_type;
pid_t sender_pid;
char message[256];
int line_count;
} Message;
volatile sig_atomic_t is_suspended = 0;
volatile sig_atomic_t should_exit = 0;
time_t start_time;
#if 0
pid_t pid1, pid2, pid3;
#else
pid_t pidlist[4];
#define pidm pidlist[0]
#define pid1 pidlist[1]
#define pid2 pidlist[2]
#define pid3 pidlist[3]
#endif
pid_t pidself; // cached value of getpid()
int xidself; // 0-3
int xidnew; // 0-3
#define prtattr(_lvl) \
__attribute__((__format__(__printf__,_lvl,_lvl + 1)))
double tsczero;
double
tscgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_MONOTONIC,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tsczero;
return sec;
}
#define prtfault(_fmt...) \
do { \
prtf(_fmt); \
exit(1); \
} while (0)
// prtf -- signal safe printf
void prtattr(1)
prtf(const char *fmt,...)
{
va_list ap;
char buf[1000];
char *bp = buf;
// printf uses the heap but sprintf doesn't [99.44% of the time ;-)]
// show which pid said what
bp += sprintf(bp,"[%.9f X:%d P:%d] ",tscgetf(),xidself,pidself);
va_start(ap,fmt);
bp += vsprintf(bp,fmt,ap);
va_end(ap);
flock(1,LOCK_EX);
write(1,buf,bp - buf);
flock(1,LOCK_UN);
}
void
propagate_signal(int sig, pid_t sender_pid)
{
#if 0
if (sender_pid == pid1) {
prtf("Propagating signal from Process 1 (PID: %d) to Process 2 and Process 3\n", sender_pid);
kill(pid2, sig);
kill(pid3, sig);
}
else if (sender_pid == pid2) {
prtf("Propagating signal from Process 2 (PID: %d) to Process 1 and Process 3\n", sender_pid);
kill(pid1, sig);
kill(pid3, sig);
}
else if (sender_pid == pid3) {
prtf("Propagating signal from Process 3 (PID: %d) to Process 1 and Process 2\n", sender_pid);
kill(pid1, sig);
kill(pid2, sig);
}
#endif
#if 0
kill(0,sig);
#endif
#if 1
prtf("propagate_signal: sig=%d\n",sig);
union sigval sigval;
sigval.sival_int = 0;
sigqueue(pidm,SIGHUP,sigval);
#endif
}
void
handle_s1(int sig,siginfo_t *info,void *vp)
{
GOTSIG("handle_s1");
should_exit = 1;
prtf("Received termination signal (S1)\n");
propagate_signal(SIGUSR1, pidself);
}
void
handle_s2(int sig,siginfo_t *info,void *vp)
{
GOTSIG("handle_s2");
is_suspended = 1;
prtf("Received suspension signal (S2)\n");;
#if 0
propagate_signal(SIGUSR2, pidself);
#else
kill(SIGUSR2,pidself);
#endif
}
void
handle_s3(int sig,siginfo_t *info,void *vp)
{
GOTSIG("handle_s3");
is_suspended = 0;
prtf("Received resume signal (S3)\n");
#if 0
propagate_signal(SIGCONT, pidself);
#else
kill(SIGCONT,pidself);
#endif
}
void
handle_s4(int sig,siginfo_t *info,void *vp)
{
Message msg;
GOTSIG("handle_s4");
#if USE_FIFO
// NOTE/BUG: this won't be fifo_1, fifo_2, fifo_3
#if 0
char fifo_path[20];
sprintf(fifo_path, "/tmp/fifo_%d", getpid());
#endif
#if 0
const char *fifo_path = fifolist[xidself];
#else
const char *fifo_path = fifolist[xidself ? xidself : 3];
#endif
#if USE_FIFO >= 2
prtf("handle_s4: open fifo_path='%s'\n",fifo_path);
int fifo_fd = open(fifo_path, O_RDONLY);
#else
int fifo_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
#endif
if (fifo_fd >= 0) {
prtf("handle_s4: read ...\n");
if (read(fifo_fd, &msg, sizeof(msg)) > 0) {
prtf("Received message from process %d: %s\n",
msg.sender_pid, msg.message);
propagate_signal(SIGINT, pidself);
if (msg.line_count > 0) {
prtf("Line count information received: %d\n", msg.line_count);
}
}
close(fifo_fd);
}
else
prtfault("handle_s4: xidself=%d fifo_path='%s' open failed -- %s\n",
xidself,fifo_path,strerror(errno));
#else
prtf("handle_s4: msgrcv xidself=%d ...\n",xidself);
if (msgrcv(msg_fd, &msg, sizeof(msg), xidself, 0) > 0) {
prtf("Received message from process %d: %s\n",
msg.sender_pid, msg.message);
propagate_signal(SIGINT, pidself);
if (msg.line_count > 0) {
prtf("Line count information received: %d\n", msg.line_count);
}
}
#endif
}
void
handle_hup(int sig,siginfo_t *info,void *vp)
{
GOTSIG("handle_hup");
for (int idx = 0; idx < countof(pidlist); ++idx) {
pid_t pidcur = pidlist[idx];
// don't send to self
if (pidcur == pidself)
continue;
// don't send to originator
if (pidcur == info->si_pid)
continue;
prtf("handle_hup: sending to idx=%d pidcur=%d\n",idx,pidcur);
kill(pidcur,sig);
}
}
void
send_message_to_others(int signal_type, const char *msg, int line_count)
{
Message message;
message.signal_type = signal_type;
message.sender_pid = getpid();
message.line_count = line_count;
strncpy(message.message, msg, 255);
#if USE_FIFO
#if 0
char fifo_path[20];
#else
const char *fifo_path;
#endif
for (int i = 1; i <= 3; i++) {
fifo_path = fifolist[i];
prtf("send_message_to_others: open fifo_path='%s'\n",fifo_path);
#if USE_FIFO >= 2
int fd = open(fifo_path, O_WRONLY);
#else
int fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
#endif
if (fd >= 0) {
int xlen = write(fd, &message, sizeof(message));
prtf("send_message_to_others: i=%d xlen=%d sizeof=%zu\n",
i,xlen,sizeof(message));
close(fd);
}
else
prtfault("send_message_to_others: i=%d fifo_path='%s' open failed -- %s\n",
i,fifo_path,strerror(errno));
}
#else
for (int i = 1; i <= 3; i++) {
message.msg_id = i;
int xlen = msgsnd(msg_fd, &message, sizeof(message), 0);
prtf("send_message_to_others: i=%d xlen=%d sizeof=%zu\n",
i,xlen,sizeof(message));
}
#endif
}
void
setup_handler(int signo,void (*handler)(int,siginfo_t *,void *))
{
struct sigaction act;
memset(&act,0,sizeof(act));
act.sa_sigaction = handler;
sigaction(signo,&act,NULL);
}
void
start_child(void)
{
xidself = xidnew;
pidself = getpid();
prtf("start_child: started\n");
}
int
main(int argc, char *argv[])
{
if (argc != 2) {
fprintf(stderr, "Usage: %s <file_path>\n", argv[0]);
exit(1);
}
tsczero = tscgetf();
// master/parent pid
pidm = getpid();
pidself = pidm;
prtf("main: pidm=%d\n",pidm);
#if USE_FIFO
for (int idx = 1; idx < countof(fifolist); ++idx) {
const char *file = fifolist[idx];
unlink(file);
int err = mkfifo(file, 0666);
if (err < 0)
prtfault("mkfifo: %s -- %s\n",file,strerror(errno));
}
#else
msg_fd = msgget(IPC_PRIVATE,IPC_CREAT | 0600);
if (msg_fd < 0)
prtfault("main: msgget fail -- %s\n",strerror(errno));
#endif
#if 0
signal(SIGUSR1, handle_s1);
signal(SIGUSR2, handle_s2);
signal(SIGCONT, handle_s3);
signal(SIGINT, handle_s4);
#else
setup_handler(SIGUSR1, handle_s1);
setup_handler(SIGUSR2, handle_s2);
setup_handler(SIGCONT, handle_s3);
setup_handler(SIGINT, handle_s4);
setup_handler(SIGHUP, handle_hup);
#endif
int pipe1[2], pipe2[2];
start_time = time(NULL);
if (pipe(pipe1) == -1 || pipe(pipe2) == -1) {
perror("pipe");
exit(1);
}
++xidnew;
pid1 = fork();
if (pid1 == 0) {
start_child();
close(pipe1[0]);
close(pipe2[0]);
close(pipe2[1]);
FILE *file = fopen(argv[1], "r");
if (file == NULL) {
perror("fopen");
exit(1);
}
char buffer[BUFFER_SIZE];
size_t bytes_read;
int line_number = 0;
while (fgets(buffer, BUFFER_SIZE, file) != NULL && !should_exit) {
while (is_suspended) {
usleep(100000);
}
bytes_read = strlen(buffer);
if (write(pipe1[1], buffer, bytes_read) != bytes_read) {
perror("write to pipe");
break;
}
line_number++;
prtf("Read line %d: %s", line_number, buffer);
send_message_to_others(4, "Read new line", line_number);
usleep(100000);
}
prtf("Finished reading file (read %d lines)\n", line_number);
fclose(file);
close(pipe1[1]);
exit(0);
}
++xidnew;
pid2 = fork();
if (pid2 == 0) {
start_child();
close(pipe1[1]);
close(pipe2[0]);
char ch;
int line_count = 0;
ssize_t bytes_read;
int last_reported_count = 0;
while ((bytes_read = read(pipe1[0], &ch, 1)) > 0 && !should_exit) {
while (is_suspended) {
usleep(100000);
}
if (ch == '\n') {
line_count++;
if (line_count != last_reported_count) {
prtf("Counted line %d\n", line_count);
write(pipe2[1], &line_count, sizeof(line_count));
send_message_to_others(4, "Counted line", line_count);
last_reported_count = line_count;
}
}
}
prtf("Finishing operation\n");
close(pipe1[0]);
close(pipe2[1]);
exit(0);
}
#if USE_PIDM
pid3 = pidm;
if (1) {
prtf("main: starting pid3 as parent\n");
#else
++xidnew;
pid3 = fork();
if (pid3 == 0) {
start_child();
#endif
close(pipe1[0]);
close(pipe1[1]);
close(pipe2[1]);
int last_count = 0;
int final_count;
while (read(pipe2[0], &final_count, sizeof(final_count)) > 0 &&
!should_exit) {
while (is_suspended) {
usleep(100000);
}
if (final_count != last_count) {
prtf("Updated line count: %d\n", final_count);
send_message_to_others(4, "Updated line count", final_count);
last_count = final_count;
}
}
prtf("Finishing operation. Final line count: %d\n", last_count);
#if USE_PIDM == 0
exit(0);
#endif
}
#if 0
waitpid(pid1, NULL, 0);
waitpid(pid2, NULL, 0);
#else
while (1) {
prtf("Waiting\n");
pid_t pidreap = wait(NULL);
if (pidreap < 0)
break;
prtf("Reaping %d/%d\n",pidreap,pidreap - pidm);
}
#endif
close(pipe2[0]);
#if USE_FIFO
for (int idx = 1; idx < countof(fifolist); ++idx)
unlink(fifolist[idx]);
#endif
return 0;
}
In the code above, I've used cpp
conditionals to denote old vs. new code:
#if 0
// old code
#else
// new code
#endif
#if 1
// new code
#endif
Note: this can be cleaned up by running the file through unifdef -k
For this particular code: unifdef -k -iUUSE_FIFO -iDUSE_PIDM
, could get a [fully] cleaned up version.
Here is the input file I used:
hello world
goodbye galaxy
universe eternal
Here is the program output when compiled with -DUSE_FIFO
:
[0.000004085 X:0 P:595644] main: pidm=595644
[0.000346837 X:1 P:595645] start_child: started
[0.000381570 X:0 P:595644] main: starting pid3 as parent
[0.000421851 X:2 P:595646] start_child: started
[0.000479973 X:1 P:595645] Read line 1: hello world
[0.000487642 X:2 P:595646] Counted line 1
[0.000503542 X:1 P:595645] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address
[0.000517247 X:2 P:595646] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address
[0.000538922 X:0 P:595644] Updated line count: 1
[0.000566704 X:0 P:595644] send_message_to_others: i=1 fifo_path='/tmp/fifo_1' open failed -- No such device or address
Here is the program output using SysV IPC:
[0.000001998 X:0 P:595688] main: pidm=595688
[0.000619131 X:0 P:595688] main: starting pid3 as parent
[0.000642203 X:1 P:595689] start_child: started
[0.000678053 X:2 P:595690] start_child: started
[0.000774182 X:1 P:595689] Read line 1: hello world
[0.000813729 X:2 P:595690] Counted line 1
[0.000879997 X:0 P:595688] Updated line count: 1
[0.001350095 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001352684 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001355468 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.001363248 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001364265 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001365704 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.001370134 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.001370989 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.001372129 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101520156 X:1 P:595689] Read line 2: goodbye galaxy
[0.101565808 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101581843 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101597385 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101611671 X:2 P:595690] Counted line 2
[0.101636212 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101641843 X:0 P:595688] Updated line count: 2
[0.101649104 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101658820 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.101660533 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.101669473 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.101678881 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201782499 X:1 P:595689] Read line 3: universe eternal
[0.201823202 X:1 P:595689] send_message_to_others: i=1 xlen=0 sizeof=280
[0.201838128 X:1 P:595689] send_message_to_others: i=2 xlen=0 sizeof=280
[0.201846063 X:1 P:595689] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201873574 X:2 P:595690] Counted line 3
[0.201902347 X:2 P:595690] send_message_to_others: i=1 xlen=0 sizeof=280
[0.201915274 X:2 P:595690] send_message_to_others: i=2 xlen=0 sizeof=280
[0.201922945 X:2 P:595690] send_message_to_others: i=3 xlen=0 sizeof=280
[0.201969388 X:0 P:595688] Updated line count: 3
[0.201994417 X:0 P:595688] send_message_to_others: i=1 xlen=0 sizeof=280
[0.202010472 X:0 P:595688] send_message_to_others: i=2 xlen=0 sizeof=280
[0.202022218 X:0 P:595688] send_message_to_others: i=3 xlen=0 sizeof=280
[0.301994886 X:1 P:595689] Finished reading file (read 3 lines)
[0.302076881 X:2 P:595690] Finishing operation
[0.302118434 X:0 P:595688] Finishing operation. Final line count: 3
[0.302132616 X:0 P:595688] Waiting
[0.302194757 X:0 P:595688] Reaping 595689/1
[0.302204503 X:0 P:595688] Waiting
[0.302265047 X:0 P:595688] Reaping 595690/2
[0.302272118 X:0 P:595688] Waiting
UPDATE #3:
"man 7 fifo" explain your fifo error. – stevea
It's really OP's error (which he couldn't get to because of the sprintf(fifo_path,"/tmp/fifo_%d",getpid())
issue I mentioned previously).
From man 7 fifo
:
A process can open a FIFO in nonblocking mode. In this case, opening for read only will succeed even if no-one has opened on the write side yet, opening for write only will fail with
ENXIO
(no such device or address) unless the other end has already been opened.
So, I changed the code above to do non-blocking open
(without O_NONBLOCK
) if USE_FIFO >= 2
.
It fixes the ENXIO
error but doesn't solve the overall problem.
Now, unfortunately, send_message_to_others
just hangs on the open
:
[0.000004437 X:0 P:656979] main: pidm=656979
[0.000427016 X:0 P:656979] main: starting pid3 as parent
[0.000458723 X:1 P:656980] start_child: started
[0.000517052 X:2 P:656981] start_child: started
[0.000606265 X:1 P:656980] Read line 1: hello world
[0.000614574 X:1 P:656980] send_message_to_others: open fifo_path='/tmp/fifo_1'
[0.000637244 X:2 P:656981] Counted line 1
[0.000663545 X:2 P:656981] send_message_to_others: open fifo_path='/tmp/fifo_1'
[0.000667690 X:0 P:656979] Updated line count: 1
[0.000681505 X:0 P:656979] send_message_to_others: open fifo_path='/tmp/fifo_1'
Side note: I've done many such systems over the years. And, [as I mentioned] I've always used SysV IPC. And, OP's usage of pipes/fifos is for short messages (vs. sending lots of data in a stream). So, IMO, SysV IPC is better suited to the problem and it is working [with less effort ;-)].
The overall issue is a race/sequencing bug in the program logic. The program tries to send a message to a process before the process has set itself up to receive it. So, when trying to write, we either get ENXIO
or a hang.
SysV IPC doesn't [seem to] have this issue:
msgsnd
will queue the message (and not be blocked).msgrcv
, it will get the
message.The real problem is that the target process will only try to receive a message from the SIGINT
signal handler. So, (with pipes) the sender gets blocked before it is able to send the signal to the receiver.
With SysV IPC, this use of signals really isn't required. And, the program would be more robust without them (IMO).
Upvotes: 2