Reputation: 23
I've been trying to get my program to work for several hours now and I just can't fgure out what's wrong with my code. It's about passing a variable between processess using pipes. Each process increments it M times. The program works perfectly when I use shared memory, but when I change it to using pipes it's a disaster. Creating or using named pipes doesn't seem to work at all, or I guess I'm just doing it the wrong way. Here's the source code:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <unistd.h>
#include <memory.h>
#include <fcntl.h>
#include <sys/stat.h>
#define PIPE_NAME "MY_PIPE"
#define N 5
#define M 10
struct sembuf operations;
int semid;
key_t key;
int marker;
void semWait(int semid, int sempos) {
operations.sem_num = sempos;
operations.sem_op = -1;
operations.sem_flg = 0;
if (semop(semid, &operations, 1) < 0) {
perror("ERROR: semop wait\n");
exit(-1);
}
}
void semPost(int semid, int sempos) {
operations.sem_num = sempos;
operations.sem_op = 1;
operations.sem_flg = IPC_NOWAIT;
if (semop(semid, &operations, 1) < 0) {
perror("ERROR: semop post\n");
exit(-1);
}
}
void worker(int id) {
int j, nmarker;
int fd = open(PIPE_NAME, O_RDWR);
read(fd, &nmarker, sizeof(int));
for (j = 0 ; j < M; j++) {
semWait(semid, id);
nmarker = nmarker + 1 ;
printf("%d ", marker);
semPost(semid, N);
}
write(fd, &nmarker, sizeof(nmarker));
close(fd);
}
main() {
int i, tempPID;
int sarray[N+1] = {0};
key = 23;
marker = 0;
if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) {
perror("ERROR: semget\n");
exit(-1);
}
if ((semctl(semid, N+1, SETALL, sarray)) < 0) {
perror("ERROR: semctl - val\n");
exit(-1);
}
if(mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0) {
perror("ERROR:pipe\n");
exit(-1);
}
int fd;
if( fd = open(PIPE_NAME, O_WRONLY) < 0 ){
perror("ERROR:open\n");
exit(-1);
}
write(fd, &marker, sizeof(marker));
close(fd);
for(i = 0; i < N; i++) {
tempPID = fork();
if (tempPID < 0) {
perror("ERROR: fork\n");
exit(-1);
}
else if (tempPID == 0) { // if child
worker(i);
exit(0);
}
}
for (i = 0 ; i < (M*N); i++) {
semPost(semid, i%N);
semWait(semid, N);
}
printf("Marker = %d\n", marker);
if (semctl( semid, 1, IPC_RMID ) == -1) {
perror("ERROR: semctl free\n");
exit(-1);
}
unlinc(PIPE_NAME);
}
I create N worker processes and each one has to increment the marker value M times. I have to create a pool of 'sleeping' processes and waken them one by one using semaphores but it's all a blur so the current source code is all I came up with... :\
This is a version of the same program but with shared memory instead of pipes:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/mman.h>
#define N 5
#define M 10
struct sembuf operations;
int semid;
key_t key;
int *sharedmem;
void semWait(int semid, int sempos) {
operations.sem_num = sempos;
operations.sem_op = -1;
operations.sem_flg = 0;
if (semop(semid, &operations, 1) < 0) {
perror("ERROR: semop wait\n");
exit(-1);
}
}
void semPost(int semid, int sempos) {
operations.sem_num = sempos;
operations.sem_op = 1;
operations.sem_flg = IPC_NOWAIT;
if (semop(semid, &operations, 1) < 0) {
perror("ERROR: semop post\n");
exit(-1);
}
}
void worker(int id) {
int j;
for (j = 0 ; j < M; j++) {
semWait(semid, id);
(*sharedmem)++;
semPost(semid, N);
}
}
main() {
int i, tempPID;
int sarray[N+1] = {0};
int protect = PROT_READ | PROT_WRITE;
int flags = MAP_SHARED | MAP_ANONYMOUS;
if ((key = ftok("/dev/null", 4343)) == -1) {
perror("ERROR: ftok\n");
exit(-1);
}
if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) {
perror("ERROR: semget\n");
exit(-1);
}
if ((semctl(semid, N+1, SETALL, sarray)) < 0) {
perror("ERROR: semctl - val\n");
exit(-1);
}
sharedmem = (int*)mmap(NULL, sizeof(int), protect, flags, 0, 0);
*(sharedmem) = 0;
for(i = 0; i < N; i++) {
tempPID = fork();
if (tempPID < 0) {
perror("ERROR: fork\n");
exit(-1);
}
else if (tempPID == 0) { // if child
worker(i);
exit(0);
}
}
for (i = 0 ; i < (M*N); i++) {
semPost(semid, i%N);
semWait(semid, N);
}
printf("Marker = %d\n", *sharedmem);
if (semctl( semid, 1, IPC_RMID ) == -1) {
perror("ERROR: semctl free\n");
exit(-1);
}
munmap(sharedmem, sizeof(int));
}
Upvotes: 2
Views: 4653
Reputation: 754400
Some of your problems are in the worker code - these two lines:
int fd = open(PIPE_NAME, O_RDWR);
read(fd, &nmarker, sizeof(int));
If you open the pipe for reading and writing, you are asking for trouble (IMNSHO). Open it for reading only, read it, close it. Then open it for writing only, write to it, close it. Now you have to consider where the semaphore operation should occur. You actually need to wake the next process before you try to open the pipe for writing, because the open for writing will block until there is a process available to read from it. Similarly, the process that opens for reading will block until there is a process available to write to it. So, the kernel will coordinate the processes.
You don't check the return value from open()
, so you've no idea whether you got a valid file descriptor. Always check the return status of open()
.
You don't check the return value from read()
, so you've no idea whether you got anything valid off the pipe. Always check the return status of read()
.
(You can decide to ignore the return status of write()
if there is no meaningful error recovery possible for a failed write, but it is not a bad idea to check that it did work. You can decide to ignore the return status of close()
for similar reasons, though you might not get to know about problems until you do the close()
.)
Continuing in the worker code:
for (j = 0 ; j < M; j++) {
semWait(semid, id);
nmarker = nmarker + 1 ;
printf("%d ", marker);
semPost(semid, N);
}
It is surprising to see you printing marker
rather than nmarker
; and surely, basic diagnostic technique prints the value of nmarker
when it is read. You might or might not print j
and nmarker
on each iteration. Note that since nothing in this code increments marker
, the value printed won't change.
The logic sequence here is interesting...it combines with the loop in main()
most oddly. The parent process writes one value to the FIFO. Only one child gets to read that value - the rest get EOF immediately, or hang indefinitely (depending on whether you use O_RDONLY
or O_RDWR
in the children). Each child gets signalled to increment its value, does so, and then goes back to sleep until woken again. There is nothing that sends the incremented value to the next child. So each child is independently incrementing whatever value it chooses - which is probably garbage. With shared memory, if you had a pointer to the shared value, then the increments were seen by all processes at once - that's why it is called shared memory. But here there is no shared memory, so you have to communicate explicitly to get it to work. (I wonder if your FIFO plus shared memory implementation worked because the communication was via shared memory - by accident, in other words?)
So, if the child is to increment the variable it reads each time, it must both read the current value and write the new value each time around the loop. This would be an error-checked read, of course. You might be OK with O_RDWR
because of the semaphores, but I'd personally be happier with the separate opens for read and write - on each iteration if need so be. But I haven't implemented this to check that it really does run into problems; it is simply aconventional to use O_RDWR
on a FIFO.
After your child has incremented its value N times, it writes the result to the pipe.
write(fd, &nmarker, sizeof(nmarker));
close(fd);
The main program then does:
printf("Marker = %d\n", marker);
if (semctl( semid, 1, IPC_RMID ) == -1) {
perror("ERROR: semctl free\n");
exit(-1);
}
unlinc(PIPE_NAME);
Since it has not modified marker
, the value printed will be 0. You should be having the main process read the replies from each of the children.
The correct function for unlinking a FIFO is unlink()
or remove()
.
As noted in a comment, one problem was that opening the FIFO was blocking - no readers. However, that was far from the only problem.
The code below runs. I haven't verified that the number is being incremented as it should (but it is being incremented). I've not checked that every process is getting its turn. I've revised the error handling (one line per call instead of 3 or 4), and added a printing function that includes the PID in the output. I've error checked every system call (but none of the printing statements). I fixed a problem if (fd = open(...) < 0)
. As far as I could tell, closing the FIFO in the master process discarded the content written to it - so the parent no longer closes the FIFO immediately. But mainly I moved the read and write of the FIFO into the worker loop - leaving open and close outside. The code is also laced with diagnostic printing so I can see where it is going wrong when it is going wrong. I haven't done header minimization or any of a number of other cleanups that should occur. However, everything except main()
is static so it doesn't have to be pre-declared. It compiles clean under:
/usr/bin/gcc -O3 -g -std=c99 -Wall -Wextra fifocircle.c -o fifocircle
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <unistd.h>
#include <memory.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
static const char *arg0 = "undefined";
static void err_error(const char *fmt, ...)
{
int errnum = errno;
va_list args;
fflush(0);
fprintf(stderr, "%s: pid %d:", arg0, (int)getpid());
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
if (errnum != 0)
fprintf(stderr, "(%d: %s)", errnum, strerror(errnum));
fputc('\n', stderr);
exit(1);
}
static void print(const char *fmt, ...)
{
va_list args;
printf("pid %d: ", (int)getpid());
va_start(args, fmt);
vfprintf(stdout, fmt, args);
va_end(args);
fflush(0);
}
#define PIPE_NAME "MY_PIPE"
#define N 5
#define M 10
static struct sembuf operations;
static int semid;
static key_t key;
static int marker;
static void semWait(int semid, int sempos)
{
operations.sem_num = sempos;
operations.sem_op = -1;
operations.sem_flg = 0;
if (semop(semid, &operations, 1) < 0)
err_error("semop wait");
}
static void semPost(int semid, int sempos)
{
operations.sem_num = sempos;
operations.sem_op = 1;
operations.sem_flg = IPC_NOWAIT;
if (semop(semid, &operations, 1) < 0)
err_error("semop post");
}
static void worker(int id)
{
int j;
int fd = open(PIPE_NAME, O_RDWR);
if (fd < 0)
err_error("failed to open FIFO %s for read & write", PIPE_NAME);
print("Worker %d: fd %d\n", id, fd);
for (j = 0 ; j < M; j++)
{
int nmarker;
print("waiting for %d\n", id);
semWait(semid, id);
if (read(fd, &nmarker, sizeof(int)) != sizeof(int))
err_error("short read from FIFO");
print("Got %d from FIFO\n", nmarker);
nmarker = nmarker + 1 ;
if (write(fd, &nmarker, sizeof(nmarker)) != sizeof(nmarker))
err_error("short write to FIFO");
print("Wrote %d to FIFO\n", nmarker);
print("posting %d\n", id);
semPost(semid, N);
}
if (close(fd) != 0)
err_error("failed to close FIFO");
print("done\n");
}
int main(int argc, char **argv)
{
int i;
int sarray[N+1] = {0};
key = 23;
marker = 0;
arg0 = argv[0];
if (argc != 1)
err_error("Usage: %s\n", arg0);
if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1)
err_error("semget");
if ((semctl(semid, N+1, SETALL, sarray)) < 0)
{
perror("ERROR: semctl - val\n");
exit(-1);
}
if (mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0)
err_error("failed to create FIFO %s\n", PIPE_NAME);
print("FIFO created\n");
int fd;
if ((fd = open(PIPE_NAME, O_RDWR)) < 0 )
err_error("failed to open FIFO %s\n", PIPE_NAME);
print("FIFO opened\n");
if (write(fd, &marker, sizeof(marker)) != sizeof(marker))
err_error("short write to FIFO");
print("FIFO loaded\n");
print("Master: about to fork\n");
for (i = 0; i < N; i++)
{
pid_t pid = fork();
if (pid < 0)
err_error("failed to fork");
else if (pid == 0)
{
worker(i);
exit(0);
}
}
print("Master: about to loop\n");
for (i = 0 ; i < (M*N); i++)
{
print("posting to %d\n", i%N);
semPost(semid, i%N);
print("waiting for %d\n", N);
semWait(semid, N);
}
if (close(fd) != 0)
err_error("failed to close FIFO");
print("Marker = %d\n", marker);
if (semctl( semid, 1, IPC_RMID ) == -1)
err_error("semctl remove");
if (unlink(PIPE_NAME) != 0)
err_error("failed to remove FIFO %s", PIPE_NAME);
return(0);
}
Upvotes: 2