Karsten S.
Karsten S.

Reputation: 2391

How to fork and read from multiple child processes?

my main goal is to do some (externally depending time expensive) work for a large list of objects. For that, if I do it straight forward, it takes a lot of time. So I decided to go to parallel mode and fork some (4-8, let's see) child processes each of which does the job for a smaller set of objects. In the main (parent) process I wanted to printout the same overall statistic information of the progress I had for the one-process version.

However, when I fork 4 child processes and do some work in it, I can see that they are alive, but only one of them is actually doing something and sending back information to the parent.

Here's the code I've done so far - the time consuming part is mocked with a random usleep, which simulates its behaviour quite well.

#!/usr/bin/env perl
use strict;
use warnings;

use DateTime;
use DateTime::Format::HTTP;
use Time::HiRes;

my @to_be_processed = (1..300000);
my @queues;
my $nprocs = 4;

my $parent_from_child;
my @child_from_parent;
my @child_to_parent;

$SIG{CHLD} = 'IGNORE';
$|=1; # autoflush

my %stat = (
    total           => scalar(@to_be_processed),
    processed       => 0,
    time_started    => [Time::HiRes::gettimeofday],
);

# divide the list into queues for each subprocess
for (my $i = 0; $i < $stat{total}; $i++ ) {
    my $queue = $i % $nprocs;
    push @{$queues[$queue]}, $to_be_processed[$i];
}

# for progress simulation
srand (time ^ $$);

for (my $proc = 0; $proc < $nprocs; $proc++) {

    # set up the pipes
    pipe $parent_from_child, $child_to_parent[$proc]        or die "pipe failed - $!";

    # fork
    defined(my $pid = fork) or die "fork failed - $!";

    if ($pid) {
        # parent
        close $child_to_parent[$proc];
        printf("[%u] parent says: child %u created with pid %u\n", $$, $proc, $pid);
    }
    else {
        # child
        close $parent_from_child;
        open(STDOUT, ">&=" . fileno($child_to_parent[$proc]))   or die "open failed - $!";

        warn(sprintf("[%u] child alive with %u entries\n", $$, scalar(@{$queues[$proc]})));

        foreach my $id (@{$queues[$proc]}) {
            printf("START: %s\n", $id);

            # simulation of progress
            my $random_microseconds = int(rand(3000000))+200000;
            warn(sprintf("[%u] child 'works' for %u microseconds", $$, $random_microseconds));
            Time::HiRes::usleep( $random_microseconds );

            printf("DONE\n")
        }
        exit(0);
    }
}

# parent: receive data from children and print overall statistics
while (<$parent_from_child>) {
    chomp(my $line = $_);

    if ($line =~ m/^START: (\S+)/) {
        my ($id) = @_;

        printf("%6u/%6u", $stat{processed}, $stat{total});
        if ($stat{time_avg}) {
            my $remaining = ($stat{total} - $stat{processed}) * $stat{time_avg};
            my $eta = DateTime->from_epoch( epoch => time + $remaining );
            $eta->set_time_zone('Europe/Berlin');
            printf(" (ETA %s)", DateTime::Format::HTTP->format_isoz($eta));
        }
        printf("\r");
    }
    elsif ($line =~ /^DONE/) {
        $stat{processed}++;
        $stat{time_processed} = Time::HiRes::tv_interval( $stat{time_started} );
        $stat{time_avg}       = $stat{time_processed} / $stat{processed};
    }
    else {
        printf("%s\n", $line);
    }
}

Normally the warn should be eliminated. If you run it you should see that only one child works. My question is: why? Where is my mistake and how can I get all of them doing the job?

Thanks K.

Upvotes: 2

Views: 3773

Answers (1)

derobert
derobert

Reputation: 51197

You can run perl under strace, and you'll find that your children's lives are fairly short, and look like this:

close(3)                                = 0
ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff753b3a10) = -1 EINVAL (Invalid argument)
lseek(4, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
fstat(4, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
dup2(4, 1)                              = 1
dup(4)                                  = 3
fcntl(4, F_GETFD)                       = 0x1 (flags FD_CLOEXEC)
dup2(3, 4)                              = 4
fcntl(4, F_SETFD, FD_CLOEXEC)           = 0
close(3)                                = 0
fcntl(1, F_SETFD, 0)                    = 0
write(2, "[30629] child alive with 75000 e"..., 39) = 39
brk(0x3582000)                          = 0x3582000
write(1, "START: 1\n", 9)               = -1 EPIPE (Broken pipe)
--- SIGPIPE (Broken pipe) @ 0 (0) ---

This is why:

pipe $parent_from_child, $child_to_parent[$proc]        or die "pipe failed - $!";

You've used the array on the wrong argument to pipe. You need to keep all the read sides open in the parent. Instead, you've set up an array so the parent could keep all the write sides open (but then in your parent block, you immediately close the write side). So the next time through your loop, pipe creates a new handle, assigns it to $parent_from_child. The old value thus has no more references, and perl cleans it up—meaning, it closes the filehandle. So your children except for the last all die of SIGPIPE.

I think you're under the impression you can re-use that read handle and just assign multiple write handles to it. You can't. pipe always makes a new read handle and a new write handle.

If you really want to share the same read handle (you probably don't, this will lead to corruption when output from two clients gets interleaved), just create it once, outside of the loop. All the children will inherit the same write handle via fork. More likely, you want one per child, and you'll have to use a select loop to see which ones have output available, and read those.

Alternatively, I'm sure CPAN has a ready-made solution (or ten) for you.

Upvotes: 6

Related Questions