bendour
bendour

Reputation: 61

How to synchronize multiple processes in C?

I have a car structure table which is in shared memory. I run 10 fork() and wait until all the children are done to display the results. (The result is just their total time of each car, it's different because I am generating a random number for each car).

I am using a loop to generate numbers and store them in my totalTime variable. But I wouldn't want one process to end the loop to give access to the next. I want all processes to be in sync.

I think I should use a semaphore but after reading the documentation I don't know how to implement them.

Can someone help me please?

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include <sys/wait.h>

#define NUMBER_OF_CARS 10
#define MIN 25 // time generator
#define MAX 40

typedef struct {
    unsigned int totalTime;
} car;

car *shared_memory;

int drive(int i);
unsigned int generateNumber(void);
void display();

int main(void)
{
/***************************************************
*           Creating shared memory        *
****************************************************/
    int segment_id = shmget(IPC_PRIVATE, sizeof(car) * NUMBER_OF_CARS, 0666 | IPC_CREAT);
    if (segment_id == -1) {
        perror("shmget() failed !");
        exit(EXIT_FAILURE);
    }

    shared_memory = shmat(segment_id, NULL, 0);
    if (shared_memory == (void *) (-1)) {
        perror("shmat() failed !");
        exit(EXIT_FAILURE);
    }

    /**********************************************************
     *               Creation of child / cars              *
     **********************************************************/
    for (int i = 0; i < NUMBER_OF_CARS; ++i)
    {
        /********  échec du fork *********/
        pid_t pid = fork();
        if (pid == -1) {
            perror("fork failed !");
            exit(EXIT_FAILURE);
        }

        /********  le fils *********/
        if(pid == 0) {
            drive(i);
            exit(EXIT_SUCCESS);
        }

        wait(NULL);
        display();
        sleep(1);
    }


    /********  Detach memory segments  *********/
    shmdt(shared_memory);
    /********  Delete shared memory  *********/
    shmctl(segment_id, IPC_RMID, NULL);

    exit(EXIT_SUCCESS);
}

unsigned int timeMaxi = 5400;

int drive( int i ) {
    srand(time(NULL) + getpid());

    unsigned int number;

    // I want it to be executed by all processes at the same time!
    while ( shared_memory[i].totalTime <= timeMaxi )
    {
        number = generateNumber();
        shared_memory[i].totalTime += number;
    }
    return 0;
}

void display(void) {
    for (int i = 0; i < NUMBER_OF_CARS; i++){
        printf("Total %d : %d\n", i, shared_memory[i].totalTime);
    }
}

unsigned int generateNumber(void)
{
    return rand()%(MAX-MIN+1)+MIN;
}

Upvotes: 1

Views: 1320

Answers (1)

Craig Estey
Craig Estey

Reputation: 33601

AFAICT, you do not need synchronization because each process is incrementing a different element of the array (indexed by i).

But, your placement of wait(NULL) (and display) will force all processes to be executed sequentially rather than in parallel.

So, remove the sleep. Move the wait into a loop of its own (after your for loop) and then do display:

while (wait(NULL) >= 0);
display();

In other words:

for (int i = 0; i < NUMBER_OF_CARS; ++i) {
    /********  échec du fork *********/
    pid_t pid = fork();

    if (pid == -1) {
        perror("fork failed !");
        exit(EXIT_FAILURE);
    }

    /********  le fils *********/
    if (pid == 0) {
        drive(i);
        exit(EXIT_SUCCESS);
    }
}

while (wait(NULL) >= 0);
display();

If you're paranoid and wish to avoid any cache snooping/coherency issues, you could use stdatomic.h primitives:

#include <stdatomic.h>

unsigned int timeMaxi = 5400;

int
drive(int i)
{
    srand(time(NULL) + getpid());

    unsigned int number;

    // I want it to be executed by all processes at the same time!
#if 0
    while (shared_memory[i].totalTime <= timeMaxi) {
        number = generateNumber();
        shared_memory[i].totalTime += number;
    }
#else
    while (1) {
        number = generateNumber();
        number += atomic_fetch_add(&shared_memory[i].totalTime,number);
        if (number > timeMaxi)
            break;
    }
#endif

    return 0;
}

UPDATE:

Thank you for your reply! I understand that in this program it is not necessary to use the semaphore. But I would like to use them to understand how it works. so if you can help me implement them. – bendour

Okay ... But, be careful what you wish for--you may actually get it :-)

Things would be easier with threads. Using the stdatomic.h primitives would allow more/better parallelism. We could create another shared memory area and use sem_* [posix] semaphores. But, since the original code used SysV shared memory areas, it seems fitting to use SysV semaphores for synchronization.

I created a version that uses SysV semaphores: semget, semctl. The actual mechanics of using these primitives is relatively easy/simple. See the init, acquire, and release functions in the sample code below.

However, I should point out that there is an issue with the processes "racing". That is, there is no guarantee of "fairness". When we use semaphores to sync things, there is an issue. Consider the following loop:

while (1) {
    // (1) lock semaphore
    // (2) do stuff ...
    // (3) unlock semaphore
}

The problem is that suppose we have two [or more] processes running the above loop.

If process A locks, does stuff, and unlocks, it can monopolize the lock. That is, after process A unlocks at step (3), it can win the race and immediately (re)lock at step (1) of the next iteration.

Other processes contending for the lock can be held off indefinitely because process A unlocks and immediately relocks the semaphore.

This can be hard to detect/measure. Interspersing debug printf calls will disturb the [real] timing.

A technique I've used in realtime/production code is to have a high speed "event trace queue". I've added debug code that supports that (the various evt* functions below). In lieu of debug printf calls, I've added calls that store away a history of the program execution (with timestamps), similar to what a H/W logic analyzer does (or Sun's dtrace).

When running these trace calls are much faster than printf calls. For example, on my system, adding a trace entry to the queue only takes 90 nanoseconds.

When the program/process completes, it dumps the trace queue to files that can be later post processed [with the evtpost script below].

After learning how to interpret the trace data, you'll [probably] notice that once a process gets the lock, it tends to be able to relock it and do more work at the expense of the other threads.

So, instead of getting a clean/balanced intersperal among the processes, the other processes are held off. This is known as thread/process "starvation".

Instead of getting a maximally parallel use of multiple processes running, the actual execution is much more "bursty" and/or sequential. This tends to defeat the whole purpose of having multiple processes in the first place.

That is why I [originally] suggested the "lockless" and stdatomic.h approaches. Those are but a few of the [known] techiques to alleviate the problem.

Anyway, here is the updated program:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include <sys/wait.h>

#include <sys/sem.h>
#include <stdatomic.h>
#include <errno.h>
#include <sys/file.h>

#define NUMBER_OF_CARS 10
#define MIN 25                          // time generator
#define MAX 40

#define MAXSEM      2                   // number of semaphores

typedef struct {
    unsigned int totalTime;
} car;

car *shared_memory;

int drive(void);
unsigned int generateNumber(void);
void display(void);

int carno;                              // current car number
int sem_id;                             // semaphore set ID
int acqrel;                             // acquire/release sequence number

typedef long long s64;
typedef s64 tsc_t;

tsc_t tsczero;

// tracing event
typedef struct {
    tsc_t evt_tsc;                      // event time
    const char *evt_name;               // event name
    int evt_carno;                      // car number
    s64 evt_val;                        // event value
} evt_t, *evt_p;

#define EVTMAX      10000
evt_t evtlist[EVTMAX];
int evtidx_enq;                         // enqueue index
int evtidx_deq;                         // dequeue index
FILE *evtxf;                            // evtdump stream

#if EVT
#define evtenq(_name,_val) \
    _evtenq(_name,_val)
#else
#define evtenq(_name,_val) \
    do { } while (0)
#endif

#define sysfault(_fmt...) \
    do { \
        fprintf(stderr,"%2.2d ",carno); \
        fprintf(stderr,_fmt); \
        exit(1); \
    } while (0)

// tscget -- get hires timestamp
tsc_t
tscget(void)
{
    struct timespec ts;
    tsc_t tsc;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    tsc = ts.tv_sec;
    tsc *= 1000000000;
    tsc += ts.tv_nsec;

    tsc -= tsczero;

    return tsc;
}

// tscsec -- convert timestamp to fractional seconds
double
tscsec(tsc_t tsc)
{
    double sec;

    sec = tsc;
    sec /= 1e9;

    return sec;
}

// evtinc -- increment queue index with wrap
int
evtinc(int idx)
{

    idx += 1;
    idx %= EVTMAX;

    return idx;
}

// _evtenq -- enqueue trace event
evt_p
_evtenq(const char *name,s64 val)
{
    int enqcur = evtidx_enq;
    int enqnext = evtinc(enqcur);
    evt_p evt;

    // pop element off queue to make room if queue is full
    if (enqnext == evtidx_deq)
        evtidx_deq = evtinc(evtidx_deq);

    evt = &evtlist[enqcur];
    evtidx_enq = enqnext;

    evt->evt_tsc = tscget();

    evt->evt_name = name;
    evt->evt_carno = carno;

    evt->evt_val = val;

    return evt;
}

// evtdeq -- dequeue trace event
evt_p
evtdeq(void)
{
    evt_p evt;

    do {
        // bug out on empty queue
        if (evtidx_deq == evtidx_enq) {
            evt = NULL;
            break;
        }

        evt = &evtlist[evtidx_deq];
        evtidx_deq = evtinc(evtidx_deq);
    } while (0);

    return evt;
}

// evtdump -- dump out trace queue
void
evtdump(void)
{
    FILE *xf;
    int openflg;
    char file [100];
    evt_p evt;

    xf = evtxf;
    openflg = (xf == NULL);
    if (openflg) {
        sprintf(file,"evtdump_%2.2d",carno);
        xf = fopen(file,"w");
    }

    flock(fileno(xf),LOCK_EX);

    while (1) {
        evt = evtdeq();
        if (evt == NULL)
            break;

        fprintf(xf,"[%.9f/%2.2d] %s %lld\n",
            tscsec(evt->evt_tsc),evt->evt_carno,evt->evt_name,evt->evt_val);
    }

    fflush(xf);
    flock(fileno(xf),LOCK_UN);

    if (openflg)
        fclose(xf);
}

// init -- initialize semaphore set
void
init(void)
{
    int err;
    int val;

    sem_id = semget(IPC_PRIVATE, MAXSEM, 0666 | IPC_CREAT);
    if (sem_id == -1) {
        perror("shmget() failed !");
        exit(EXIT_FAILURE);
    }

    for (int semnum = 0;  semnum < MAXSEM;  ++semnum) {
        // get the old value
        err = semctl(sem_id,semnum,GETVAL,NULL);
        printf("init: semnum=%d err=%d\n",semnum,err);

        // set initial value (otherwise, we will hang in acquire)
        val = 1;
        err = semctl(sem_id,semnum,SETVAL,val);

        // show the new value
        err = semctl(sem_id,semnum,GETVAL,NULL);
        printf("init: semnum=%d err=%d\n",semnum,err);
    }
}

// acquire -- acquire the semaphore
void
acquire(int semnum)
{
    struct sembuf sop;
    int err;

    ++acqrel;

    sop.sem_num = semnum;
    sop.sem_op = -1;
    sop.sem_flg = 0;

    evtenq("acquire/BEG",acqrel);
    err = semop(sem_id,&sop,1);
    if (err < 0)
        err = errno;
    evtenq("acquire/END",acqrel);

    if (err)
        sysfault("acquire: fault semnum=%d -- %s\n",semnum,strerror(err));
}

// release -- release the semaphore
void
release(int semnum)
{
    struct sembuf sop;
    int err;

    sop.sem_num = semnum;
    sop.sem_op = 1;
    sop.sem_flg = 0;

    evtenq("release/BEG",acqrel);
    err = semop(sem_id,&sop,1);
    if (err < 0)
        err = errno;
    evtenq("release/END",acqrel);

    if (err)
        sysfault("acquire: fault semnum=%d -- %s\n",semnum,strerror(err));
}

void
sighdr(int signo)
{

    evtdump();
    exit(9);
}

int
main(void)
{

    setlinebuf(stdout);
    setlinebuf(stderr);

    tsc_t tsc = tscget();
#if 0
    srand(tsc);
#endif
    tsczero = tsc;

    signal(SIGINT,sighdr);

    init();

#if 0
    evtxf = fopen("evtdump","w");
#endif

    // Creating shared memory
    int segment_id = shmget(IPC_PRIVATE, sizeof(car) * NUMBER_OF_CARS,
        0666 | IPC_CREAT);

    if (segment_id == -1) {
        perror("shmget() failed !");
        exit(EXIT_FAILURE);
    }

    shared_memory = shmat(segment_id, NULL, 0);
    if (shared_memory == (void *) (-1)) {
        perror("shmat() failed !");
        exit(EXIT_FAILURE);
    }

    /**********************************************************
     *               Creation of child / cars              *
     **********************************************************/
    for (carno = 0; carno < NUMBER_OF_CARS; ++carno) {
        /********  échec du fork *********/
        pid_t pid = fork();

        if (pid == -1) {
            perror("fork failed !");
            exit(EXIT_FAILURE);
        }

        /********  le fils *********/
        if (pid == 0) {
            drive();
            exit(EXIT_SUCCESS);
        }

#if 0
        wait(NULL);
        display();
        sleep(1);
#endif
    }

#if 1
    while (wait(NULL) >= 0);
    display();
#endif

    /********  Detach memory segments  *********/
    shmdt(shared_memory);
    /********  Delete shared memory  *********/
    shmctl(segment_id, IPC_RMID, NULL);

    // delete semaphores
    for (int semno = 0;  semno < MAXSEM;  ++semno)
        semctl(sem_id, semno, IPC_RMID);

    if (evtxf != NULL)
        fclose(evtxf);

    exit(EXIT_SUCCESS);
}

unsigned int timeMaxi = 5400;

int
drive(void)
{
#if 1
    srand(tscget() + tsczero + getpid());
#endif

    unsigned int number;

    for (int idx = 0;  idx <= 10;  ++idx)
        evtenq("drive/TEST",idx);

    car *car = &shared_memory[carno];

    // I want it to be executed by all processes at the same time!

    // non-synced version
#if 0
    while (car->totalTime <= timeMaxi) {
        number = generateNumber();
        car->totalTime += number;
    }
#endif

    // atomic primitive version
#if 0
    while (1) {
        number = generateNumber();
        number += atomic_fetch_add(&car->totalTime,number);
        if (number > timeMaxi)
            break;
    }
#endif

    // acquire/release semaphore version
#if 1
    while (1) {
        acquire(0);

        if (car->totalTime > timeMaxi) {
            release(0);
            break;
        }

        number = generateNumber();
        car->totalTime += number;

        release(0);
    }
#endif

    evtdump();
    exit(0);

    return 0;
}

void
display(void)
{
    for (int i = 0; i < NUMBER_OF_CARS; i++) {
        printf("Total %d : %d\n", i, shared_memory[i].totalTime);
    }
}

unsigned int
generateNumber(void)
{
    return rand() % (MAX - MIN + 1) + MIN;
}

When compiled with -DEVT, the trace event mechanism is enabled. It will store the trace data in some evtdump_* files.

Here is a [perl] script that can post process/merge these files. Its output allows us to see how each process interacts timewise:

#!/usr/bin/perl
# sysvsem/evtpost -- post process evtdump files

master(@ARGV);
exit(0);

# master -- master control
sub master
{
    my(@argv) = @_;

    $xfile = shift(@argv);
    $xfile //= "sysvsem";

    @tails = dirload(".");
    foreach $tail (@tails) {
        next unless ($tail =~ /^evtdump/);
        open($xfsrc,"<$tail");

        while ($bf = <$xfsrc>) {
            chomp($bf);
            push(@evt,$bf);
        }

        close($xfsrc);
    }

    @evt = sort(evtsort @evt);

    foreach $evt (@evt) {
        print($evt,"\n");
    }
}

sub evtsort
{
    my($lhs,$rhs);
    my($cmpflg);

    $lhs = _evtsort($a);
    $rhs = _evtsort($b);

    $cmpflg = $lhs <=> $rhs;

    $cmpflg;
}

sub _evtsort
{
    my($bf) = @_;
    my($av);
    my($val);

    ($av) = split(" ",$bf);

    if ($av =~ s/^\[//) {
        $av =~ s/\]$//;
        ($val) = split("/",$av);
    }

    sysfault("_evtsort: bf='%s'\n",$bf)
        unless (defined($val));

    $val;
}

sub dirload
{
    my($dir) = @_;
    my($xfdir);
    my($tail);
    my(@tails);

    opendir($xfdir,$dir);

    while ($tail = readdir($xfdir)) {
        next if ($tail eq ".");
        next if ($tail eq "..");
        push(@tails,$tail);
    }

    closedir($xfdir);

    @tails = sort(@tails);

    @tails;
}

Upvotes: 1

Related Questions