Reputation: 20921
I try to go in folders and read files using fork()
. I use file tree walk function to go in the folders recursively. The basic idea is that there will be children number of files and directory in a directory. The children read each file seperately and concurrently. But, if there are directories the children will be parents of the children to read files.
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
int i = 0;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
const int cause = errno;
fprintf(stderr, "Fork error: %s\n", strerror(cause));
errno = cause;
return -1;
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, key, arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
FTW_DP
and FTW_D
indicates folders FTW_F
indicates files. Basically, I tried in the code fork() every time. If it is parent, It waits for its children read the files. Because the function is recursive, it will fork every calling. But, there's something about it I cant't get it forks more than one for one file. For example there should be one child for 1a.txt
but for this scheme it is 8. Forking subject is really difficult. I do everyday exercises and try to understand it. Your explanations and helps will improve my skill in that branch.
@edit: mcve code
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <ftw.h>
#include <stdio.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
void err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
perror("failed fork");
exit(-1);
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
int main(int argc, char *argv[])
{
if (nftw("here is directory path", soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(-1);
}
return 0;
}
Upvotes: 2
Views: 2552
Reputation: 33631
You had a few bugs. The corrected code is below.
The child did not do an exit
call, so it would continue with it's own nftw
, so many files were being redundantly processed. I added the exit(0)
.
fork
s were being done so fast that the system would run out of free pid
s.
I've added three things to fix this:
waitpid(0,&status,WNOHANG)
to catch done childrenfork
to catch the "out of slots" problemI've annotated the source to point out the places were the bugs were.
While not hard bugs, doing a fork
for each file adds significant overhead. The disk bandwidth will saturate with about four active child threads, so using more just slows things down. Forking a child for the directory doesn't do much since the "meaty" processing is going to be for the file.
Anyway, here's the corrected code [please pardon the gratuitous style cleanup]:
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <errno.h>
#include <ftw.h>
#include <stdio.h>
#include <sys/wait.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
// output verbose/debug messages
int opt_v;
// limit of number of children that can be used at one time (if non-zero)
int opt_T;
int pendcnt; // number of active children
void
err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int
storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int
for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
// reap_some -- reap a few processes
int
reap_some(int final)
{
pid_t pid;
int status;
int reapcnt;
reapcnt = 0;
// reap all completed children
while (1) {
pid = waitpid(0,&status,WNOHANG);
if (pid == 0)
break;
if (pid == -1) {
if (errno != ECHILD)
perror("Failed to wait for child");
break;
}
if (WIFSIGNALED(status)) {
printf("Child %ld terminated due to uncaught signal %d\n",
(long) pid, WTERMSIG(status));
++reapcnt;
continue;
}
if (WIFSTOPPED(status)) {
printf("Child %ld stopped due to signal %d\n",
(long) pid, WSTOPSIG(status));
continue;
}
if (WIFEXITED(status)) {
++reapcnt;
if (WEXITSTATUS(status) == 0) {
if (opt_v)
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
}
else
printf("Child %ld terminated with return status %d\n",
(long) pid, WEXITSTATUS(status));
continue;
}
}
// bump down the number of children that are "in-flight"
pendcnt -= reapcnt;
return reapcnt;
}
static int
soner_each_time(const char *filepath, const struct stat *info, int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char *bp;
int lvl;
char buf[BUFSIZE];
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
switch (typeflag) {
case FTW_DP:
case FTW_D:
bp = buf;
for (lvl = 0; lvl < ftwinfo->level; ++lvl)
bp += sprintf(bp," ");
bp += sprintf(bp, "%s\n\n",filepath);
write(1, buf, strlen(buf));
//reap_some(0);
break;
case FTW_F:
// BUGFIX:
// limit the number of in-flight children
// too many children serves no purpose -- they saturate the system
// resources and performance actually goes _down_ because the system
// spends more time doing context switches between them than the actual
// work. more than a few children to process files produces little
// benefit after the disk I/O is running at maximum
if (opt_T) {
while (pendcnt > opt_T)
reap_some(0);
}
// BUGFIX:
// without a throttle, we spawn children so fast we're going to get
// [many] failures here (i.e. we use up _all_ available pids)
while (1) {
pid = fork();
if (pid >= 0)
break;
reap_some(0);
}
// parent
// keep track of the child count
if (pid > 0) {
++pendcnt;
break;
}
// child
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n",
getpid(), getppid(), filename);
if (opt_v)
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit (RETVAL: %d) |||\n", getpid(), getppid(), retval);
if (opt_v)
write(1, buf, strlen(buf));
// BUGFIX:
// child won't exit without this -- causing multiple children to redo
// the same files (i.e. they would continue the nftw -- only parent
// should do that)
exit(0);
break;
}
return 0;
}
int
main(int argc, char **argv)
{
char *cp;
--argc;
++argv;
opt_T = 10;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'T': // throttle
cp += 2;
opt_T = (*cp != 0) ? atoi(cp) : 0;
break;
case 'v': // verbose messages
opt_v = 1;
break;
}
}
cp = *argv;
printf("opt_T=%d opt_v=%d -- %s\n",opt_T,opt_v,cp);
sleep(3);
printf("away we go ...\n");
if (nftw(cp, soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(1);
}
// wait for all children to complete
while (pendcnt > 0)
reap_some(1);
return 0;
}
UPDATE:
Changed the code to do directory processing in the parent only (i.e. child is forked only for files). Fixed a bug. So, now, the -T throttle parameter works with a much lower value and can be the equivalent of "number of workers". Changed program to use a default throttle value.
UPDATE #2:
I said parent because there is only one parent. I wonder whether I may trace wrong.
No, you are correct. There is only one parent. That was by design.
I would like to make parent for each directory like explained in the first scheme.
Actually, you wouldn't/won't with a proper understanding of what's truly involved. Obi Wan Kenobi: "These are not the droids you're looking for"
There are a number of technical, performance, and system saturation issues with doing a recursive fork on each directory. The example I coded avoids all these with the best compromise for design and performance. It also allowed the master to "run ahead" of the children and keep children as busy as possible, regardless of the number of files/subdirs in a given directory.
Side note: I've got 40+ years experience and I've written a number of nftw
equivalent programs. So, the following comes from all that.
What's the desired end result?
You've only got skeleton code, but what you actually do [intend to do] influences the architecture. Your ultimate program may be:
Also, do you want pre-order or post-order [like FTW_DEPTH] traversal? I presume pre-order
You can no longer use nftw
.
You will need to do your equivalent using opendir/readdir/closedir
[which is what nftw
does].
What you need is a process that does a single level in the hierarchy. It's torture to get nftw
to abort and start a new one to achieve that.
Below is some pseudo code for this.
But ... The implementation becomes more complex and will not provide better performance and may actually degrade performance. It may also cause unrelated programs to crash, such as Firefox, vlc, window managers, etc.
You'll now need interprocess communication and shared memory
With my example above, there was only one control process. To maintain throttling, only a simple increment/decrement of pendcnt
was required.
When you add recursive forks for directories, now any subprocess forked for a directory has to increment/decrement the global copy of pendcnt
in shared memory. It must use an interprocess semaphore to control access to that variable. Or, perhaps, some atomic increment/decrement primitives [ala C11 atomics].
Now, contention for that semaphore becomes a [delay] factor.
Performance:
Having more than a few active processes actually degrades performance. In other words, forking for the directory will actually run slower than a single process.
Beyond a few "worker" processes that do something with a file, the disk I/O bandwidth will be used up. You'll get no further benefit by adding more processes.
With many processes, they may actually interfere with one another. Consider that process A requests a disk read. But, so does process B. A's read completes inside the kernel, but before it can be returned to A, the kernel buffer for A's read has to be repurposed to fulfill B's read. A's read will have to be repeated.
This is what's known as [virtual memory] page "thrashing".
Locking up and crashing the system
As more and more disk I/O is done, more and more kernel buffers have to be used to contain the data. The kernel may have to evict page buffers to make room. Some of them may be for the unrelated programs mentioned above.
In other words, your program's many processes may monopolize the CPU, disk, and memory usage. Some programs like Firefox will timeout [and crash] because they see long delays that they wouldn't see otherwise and assume that something internal to them caused the delay.
I've run such an nftw
program and seen Firefox say: "Killing locked up javascript script".
Worse yet, I've had vlc fall behind in timing and start skipping frames. This caused the window manager to get confused because it thought this was due to some logic error instead of just a very slow response system. The end result was that the window manager aborted and had to be manually restarted.
This can also slow down more critical programs and kernel daemons.
In certain cases, this could only be cleaned up by a system reboot.
Also, running many processes on a system you share with others will turn you into a "bad citizen", so be careful about consuming too many resources.
Anyway, here's the pseudo code:
// pseudo -- loose pseudo-code for non-nftw method
//
// NOTES:
// (1) pendcnt must now be a _shared_ memory variable (e.g. shmget, etc)
// (2) access must be locked by a shared memory semaphore
// (3) we must now have a list of our outstanding children
// (4) we can no longer do a blind waitpid(0,&status,WNOHANG) as we need to
// keep track of when our direct children complete
struct entfile {
struct dirent ent;
struct stat st;
};
// dodir -- enter/exit directory and perform all actions
void
dodir(const char *subdir)
{
// NOTE: you can create a wrapper struct for this that also has stat
struct entfile dirlist[1000];
// add subdir to directory stack ...
dirstack_push(subdir);
// enter directory
chdir(subdir);
// do whatever you'd like ...
process_directory(subdir);
// open directory
dirctl = opendir(".");
// pre-save all entries [skipping "." and ".."]
// this prevents too many open directory descriptors
// NOTE: we should stat(2) the file if d_type not supported
while (1) {
dirent = readdir(dirctl);
stat(dirent->d_name,&st);
add_to_dirent_list(dirlist,dirent,&st);
}
// close directory _before_ we process any entries
closedir(dirctl);
// process all file entries -- pre-order
for (ALL_IN_DIRLIST(ent,dirlist)) {
if (ent->ent.d_type == ISFILE)
doentry(ent);
}
wait_for_all_on_pendlist();
// process all directory entries -- pre-order
for (ALL_IN_DIRLIST(dirent,dirlist)) {
if (ent->ent.d_type == ISDIR)
doentry(ent);
}
wait_for_all_on_pendlist();
// remove directory from stack
dirstack_pop();
// exit directory
chdir("..")
}
// doentry -- process a directory entry
void
doentry(struct entfile *ent)
{
char *tail;
tail = ent->ent.d_name;
do {
// does throttle, etc.
pid = forkme();
// parent
// see notes above
if (pid) {
// NOTE: these semaphore waits can be costly
sem_wait();
++pendcnt;
sem_post();
add_pid_to_pendlist(pid,tail,...);
break;
}
// child
switch (ent->st.st.st_mode & ...) {
case ISFILE:
process_file(tail);
break;
case ISDIR:
dodir(tail);
break;
}
exit(0);
} while (0);
}
// wait for immediate children
void
wait_for_all_on_pendlist(void)
{
while (MORE_IN_PENDLIST) {
for (FORALL_IN_PENDLIST(tsk)) {
pid = waitpid(tsk->pid,&tsk->status,WNOHANG);
// check status like reap_some
if (pid > 0)
remove_pid_from_pendlist(tsk);
}
}
}
Upvotes: 1