traversing directories using fork()

I try to go in folders and read files using fork(). I use file tree walk function to go in the folders recursively. The basic idea is that there will be children number of files and directory in a directory. The children read each file seperately and concurrently. But, if there are directories the children will be parents of the children to read files.

static int soner_each_time(const char *filepath, const struct stat *info,
                    int typeflag, struct FTW *ftwinfo)
{

    pid_t   pid = 0;
    char    buf[BUFSIZE];
    int     status;
    int     i = 0;

    /* The variables are related to functions of reading file  */

    int     totalLines;
    char    arr[TOTALNUMBEROFLINES][BUFSIZE];
    int     retval;


    const char *const filename = filepath + ftwinfo->base;

    if (( pid = fork()) < 0) {
        const int cause = errno;
        fprintf(stderr, "Fork error: %s\n", strerror(cause));
        errno = cause;
        return -1;
    }
    else if( pid > 0 ) // parent
    {
        if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
            pid = wait(&status);
            if (pid == -1)
                perror("Failed to wait for child");
            else if (WIFEXITED(status) && !WEXITSTATUS(status))
                printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            else if (WIFEXITED(status))
                printf("Child %ld terminated with return status %d\n",
                       (long)pid, WEXITSTATUS(status));
            else if (WIFSIGNALED(status))
                printf("Child %ld terminated due to uncaught signal %d\n",
                       (long)pid, WTERMSIG(status));
            else if (WIFSTOPPED(status))
                printf("Child %ld stopped due to signal %d\n",
                       (long)pid, WSTOPSIG(status));
        }
    }


    if (pid == 0) // child
    {
        if (typeflag == FTW_F)
        {
            sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
            write(1, buf, strlen(buf));

            /* Both of them are about reading function */
            totalLines = storeLinesInArray(filename, arr);

            retval = for_each_file(filename, totalLines, key, arr);

            sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
            write(1, buf, strlen(buf));
        }

        else if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
        }

    }
        return 0;
}

FTW_DP and FTW_D indicates folders FTW_F indicates files. Basically, I tried in the code fork() every time. If it is parent, It waits for its children read the files. Because the function is recursive, it will fork every calling. But, there's something about it I cant't get it forks more than one for one file. For example there should be one child for 1a.txt but for this scheme it is 8. Forking subject is really difficult. I do everyday exercises and try to understand it. Your explanations and helps will improve my skill in that branch.

@edit: mcve code

#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <ftw.h>
#include <stdio.h>

#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000


void err_sys(const char *const str)
{
    perror(str);
    fflush(stdout);
    exit(1);
}

int storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
    return 0;
}

static int for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{

    fprintf(stdout, "File name is = %s\n", filepath);
    fflush(stdout);


    return 0;
}

static int soner_each_time(const char *filepath, const struct stat *info,
                           int typeflag, struct FTW *ftwinfo)
{

    pid_t   pid = 0;
    char    buf[BUFSIZE];
    int     status;

    /* The variables are related to functions of reading file  */

    int     totalLines;
    char    arr[TOTALNUMBEROFLINES][BUFSIZE];
    int     retval;


    const char *const filename = filepath + ftwinfo->base;

    if (( pid = fork()) < 0) {
        perror("failed fork");
        exit(-1);
    }
    else if( pid > 0 ) // parent
    {
        if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
            pid = wait(&status);
            if (pid == -1)
                perror("Failed to wait for child");
            else if (WIFEXITED(status) && !WEXITSTATUS(status))
                printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            else if (WIFEXITED(status))
                printf("Child %ld terminated with return status %d\n",
                       (long)pid, WEXITSTATUS(status));
            else if (WIFSIGNALED(status))
                printf("Child %ld terminated due to uncaught signal %d\n",
                       (long)pid, WTERMSIG(status));
            else if (WIFSTOPPED(status))
                printf("Child %ld stopped due to signal %d\n",
                       (long)pid, WSTOPSIG(status));
        }
    }


    if (pid == 0) // child
    {
        if (typeflag == FTW_F)
        {
            sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
            write(1, buf, strlen(buf));

            /* Both of them are about reading function */
            totalLines = storeLinesInArray(filename, arr);

            retval = for_each_file(filename, totalLines, "not needed now", arr);

            sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
            write(1, buf, strlen(buf));
        }

        else if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
        }

    }
    return 0;
}



int main(int argc, char *argv[])
{

    if (nftw("here is directory path", soner_each_time, 15, FTW_CHDIR)) {
        fprintf(stderr, "Failed directory.\n");
        exit(-1);
    }

    return 0;
}

Upvotes: 2

Views: 2552

Answers (1)

Craig Estey
Craig Estey

Reputation: 33631

You had a few bugs. The corrected code is below.

The child did not do an exit call, so it would continue with it's own nftw, so many files were being redundantly processed. I added the exit(0).

forks were being done so fast that the system would run out of free pids.

I've added three things to fix this:

  1. A "reap" routine that loops on waitpid(0,&status,WNOHANG) to catch done children
  2. Added a loop around the fork to catch the "out of slots" problem
  3. Added a throttling mechanism to limit the number of active children to a sane/useful value

I've annotated the source to point out the places were the bugs were.

While not hard bugs, doing a fork for each file adds significant overhead. The disk bandwidth will saturate with about four active child threads, so using more just slows things down. Forking a child for the directory doesn't do much since the "meaty" processing is going to be for the file.

Anyway, here's the corrected code [please pardon the gratuitous style cleanup]:

#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <errno.h>
#include <ftw.h>
#include <stdio.h>
#include <sys/wait.h>

#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000

// output verbose/debug messages
int opt_v;

// limit of number of children that can be used at one time (if non-zero)
int opt_T;

int pendcnt;                            // number of active children

void
err_sys(const char *const str)
{
    perror(str);
    fflush(stdout);
    exit(1);
}

int
storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
    return 0;
}

static int
for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{

    fprintf(stdout, "File name is = %s\n", filepath);
    fflush(stdout);

    return 0;
}

// reap_some -- reap a few processes
int
reap_some(int final)
{
    pid_t pid;
    int status;
    int reapcnt;

    reapcnt = 0;

    // reap all completed children
    while (1) {
        pid = waitpid(0,&status,WNOHANG);
        if (pid == 0)
            break;

        if (pid == -1) {
            if (errno != ECHILD)
                perror("Failed to wait for child");
            break;
        }

        if (WIFSIGNALED(status)) {
            printf("Child %ld terminated due to uncaught signal %d\n",
                (long) pid, WTERMSIG(status));
            ++reapcnt;
            continue;
        }

        if (WIFSTOPPED(status)) {
            printf("Child %ld stopped due to signal %d\n",
                (long) pid, WSTOPSIG(status));
            continue;
        }

        if (WIFEXITED(status)) {
            ++reapcnt;
            if (WEXITSTATUS(status) == 0) {
                if (opt_v)
                    printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            }
            else
                printf("Child %ld terminated with return status %d\n",
                    (long) pid, WEXITSTATUS(status));
            continue;
        }
    }

    // bump down the number of children that are "in-flight"
    pendcnt -= reapcnt;

    return reapcnt;
}

static int
soner_each_time(const char *filepath, const struct stat *info, int typeflag, struct FTW *ftwinfo)
{
    pid_t pid = 0;
    char *bp;
    int lvl;
    char buf[BUFSIZE];

    /* The variables are related to functions of reading file */

    int totalLines;
    char arr[TOTALNUMBEROFLINES][BUFSIZE];
    int retval;

    const char *const filename = filepath + ftwinfo->base;

    switch (typeflag) {
    case FTW_DP:
    case FTW_D:
        bp = buf;
        for (lvl = 0;  lvl < ftwinfo->level;  ++lvl)
            bp += sprintf(bp,"    ");
        bp += sprintf(bp, "%s\n\n",filepath);
        write(1, buf, strlen(buf));
        //reap_some(0);
        break;

    case FTW_F:
        // BUGFIX:
        // limit the number of in-flight children
        // too many children serves no purpose -- they saturate the system
        // resources and performance actually goes _down_ because the system
        // spends more time doing context switches between them than the actual
        // work. more than a few children to process files produces little
        // benefit after the disk I/O is running at maximum
        if (opt_T) {
            while (pendcnt > opt_T)
                reap_some(0);
        }

        // BUGFIX:
        // without a throttle, we spawn children so fast we're going to get
        // [many] failures here (i.e. we use up _all_ available pids)
        while (1) {
            pid = fork();
            if (pid >= 0)
                break;
            reap_some(0);
        }

        // parent
        // keep track of the child count
        if (pid > 0) {
            ++pendcnt;
            break;
        }

        // child
        sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n",
            getpid(), getppid(), filename);
        if (opt_v)
            write(1, buf, strlen(buf));

        /* Both of them are about reading function */
        totalLines = storeLinesInArray(filename, arr);

        retval = for_each_file(filename, totalLines, "not needed now", arr);

        sprintf(buf, "||| Child [%d] of parent [%d] is about to exit (RETVAL: %d) |||\n", getpid(), getppid(), retval);
        if (opt_v)
            write(1, buf, strlen(buf));

        // BUGFIX:
        // child won't exit without this -- causing multiple children to redo
        // the same files (i.e. they would continue the nftw -- only parent
        // should do that)
        exit(0);
        break;
    }

    return 0;
}

int
main(int argc, char **argv)
{
    char *cp;

    --argc;
    ++argv;

    opt_T = 10;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'T':  // throttle
            cp += 2;
            opt_T = (*cp != 0) ? atoi(cp) : 0;
            break;
        case 'v':  // verbose messages
            opt_v = 1;
            break;
        }
    }

    cp = *argv;
    printf("opt_T=%d opt_v=%d -- %s\n",opt_T,opt_v,cp);
    sleep(3);
    printf("away we go ...\n");

    if (nftw(cp, soner_each_time, 15, FTW_CHDIR)) {
        fprintf(stderr, "Failed directory.\n");
        exit(1);
    }

    // wait for all children to complete
    while (pendcnt > 0)
        reap_some(1);

    return 0;
}

UPDATE:

Changed the code to do directory processing in the parent only (i.e. child is forked only for files). Fixed a bug. So, now, the -T throttle parameter works with a much lower value and can be the equivalent of "number of workers". Changed program to use a default throttle value.


UPDATE #2:

I said parent because there is only one parent. I wonder whether I may trace wrong.

No, you are correct. There is only one parent. That was by design.

I would like to make parent for each directory like explained in the first scheme.

Actually, you wouldn't/won't with a proper understanding of what's truly involved. Obi Wan Kenobi: "These are not the droids you're looking for"

There are a number of technical, performance, and system saturation issues with doing a recursive fork on each directory. The example I coded avoids all these with the best compromise for design and performance. It also allowed the master to "run ahead" of the children and keep children as busy as possible, regardless of the number of files/subdirs in a given directory.

Side note: I've got 40+ years experience and I've written a number of nftw equivalent programs. So, the following comes from all that.

What's the desired end result?

You've only got skeleton code, but what you actually do [intend to do] influences the architecture. Your ultimate program may be:

  1. CPU bound [constantly waiting for CPU operations like multiplies, etc]
  2. Memory bound [constantly waiting for reads from or writes to DRAM to complete]
  3. I/O bound [constantly waiting for I/O operations to complete]

Also, do you want pre-order or post-order [like FTW_DEPTH] traversal? I presume pre-order

You can no longer use nftw.

You will need to do your equivalent using opendir/readdir/closedir [which is what nftw does].

What you need is a process that does a single level in the hierarchy. It's torture to get nftw to abort and start a new one to achieve that.

Below is some pseudo code for this.

But ... The implementation becomes more complex and will not provide better performance and may actually degrade performance. It may also cause unrelated programs to crash, such as Firefox, vlc, window managers, etc.

You'll now need interprocess communication and shared memory

With my example above, there was only one control process. To maintain throttling, only a simple increment/decrement of pendcnt was required.

When you add recursive forks for directories, now any subprocess forked for a directory has to increment/decrement the global copy of pendcnt in shared memory. It must use an interprocess semaphore to control access to that variable. Or, perhaps, some atomic increment/decrement primitives [ala C11 atomics].

Now, contention for that semaphore becomes a [delay] factor.

Performance:

Having more than a few active processes actually degrades performance. In other words, forking for the directory will actually run slower than a single process.

Beyond a few "worker" processes that do something with a file, the disk I/O bandwidth will be used up. You'll get no further benefit by adding more processes.

With many processes, they may actually interfere with one another. Consider that process A requests a disk read. But, so does process B. A's read completes inside the kernel, but before it can be returned to A, the kernel buffer for A's read has to be repurposed to fulfill B's read. A's read will have to be repeated.

This is what's known as [virtual memory] page "thrashing".

Locking up and crashing the system

As more and more disk I/O is done, more and more kernel buffers have to be used to contain the data. The kernel may have to evict page buffers to make room. Some of them may be for the unrelated programs mentioned above.

In other words, your program's many processes may monopolize the CPU, disk, and memory usage. Some programs like Firefox will timeout [and crash] because they see long delays that they wouldn't see otherwise and assume that something internal to them caused the delay.

I've run such an nftw program and seen Firefox say: "Killing locked up javascript script".

Worse yet, I've had vlc fall behind in timing and start skipping frames. This caused the window manager to get confused because it thought this was due to some logic error instead of just a very slow response system. The end result was that the window manager aborted and had to be manually restarted.

This can also slow down more critical programs and kernel daemons.

In certain cases, this could only be cleaned up by a system reboot.

Also, running many processes on a system you share with others will turn you into a "bad citizen", so be careful about consuming too many resources.


Anyway, here's the pseudo code:

// pseudo -- loose pseudo-code for non-nftw method
//
// NOTES:
// (1) pendcnt must now be a _shared_ memory variable (e.g. shmget, etc)
// (2) access must be locked by a shared memory semaphore
// (3) we must now have a list of our outstanding children
// (4) we can no longer do a blind waitpid(0,&status,WNOHANG) as we need to
//     keep track of when our direct children complete

struct entfile {
    struct dirent ent;
    struct stat st;
};

// dodir -- enter/exit directory and perform all actions
void
dodir(const char *subdir)
{
    // NOTE: you can create a wrapper struct for this that also has stat
    struct entfile dirlist[1000];

    // add subdir to directory stack ...
    dirstack_push(subdir);

    // enter directory
    chdir(subdir);

    // do whatever you'd like ...
    process_directory(subdir);

    // open directory
    dirctl = opendir(".");

    // pre-save all entries [skipping "." and ".."]
    // this prevents too many open directory descriptors
    // NOTE: we should stat(2) the file if d_type not supported
    while (1) {
        dirent = readdir(dirctl);
        stat(dirent->d_name,&st);
        add_to_dirent_list(dirlist,dirent,&st);
    }

    // close directory _before_ we process any entries
    closedir(dirctl);

    // process all file entries -- pre-order
    for (ALL_IN_DIRLIST(ent,dirlist)) {
        if (ent->ent.d_type == ISFILE)
            doentry(ent);
    }
    wait_for_all_on_pendlist();

    // process all directory entries -- pre-order
    for (ALL_IN_DIRLIST(dirent,dirlist)) {
        if (ent->ent.d_type == ISDIR)
            doentry(ent);
    }
    wait_for_all_on_pendlist();

    // remove directory from stack
    dirstack_pop();

    // exit directory
    chdir("..")
}

// doentry -- process a directory entry
void
doentry(struct entfile *ent)
{
    char *tail;

    tail = ent->ent.d_name;

    do {
        // does throttle, etc.
        pid = forkme();

        // parent
        // see notes above
        if (pid) {
            // NOTE: these semaphore waits can be costly
            sem_wait();
            ++pendcnt;
            sem_post();

            add_pid_to_pendlist(pid,tail,...);
            break;
        }

        // child
        switch (ent->st.st.st_mode & ...) {
        case ISFILE:
            process_file(tail);
            break;
        case ISDIR:
            dodir(tail);
            break;
        }

        exit(0);
    } while (0);
}

// wait for immediate children
void
wait_for_all_on_pendlist(void)
{

    while (MORE_IN_PENDLIST) {
        for (FORALL_IN_PENDLIST(tsk)) {
            pid = waitpid(tsk->pid,&tsk->status,WNOHANG);

            // check status like reap_some
            if (pid > 0)
                remove_pid_from_pendlist(tsk);
        }
    }
}

Upvotes: 1

Related Questions