Steve P.
Steve P.

Reputation: 14709

Trying to use fork to do a seemingly simple task, but failing miserably

So, basically I have a very large array that I need to read data from. I want to be able to do this in parallel; however, when I tried, I failed miserably. For the sake of simplicity, let's say I have an array with 100 elements in it. My idea was to partition the array into 10 equals parts and try to read them in parallel (10 is arbitrary, but I don't know how many processes I could run at once and 10 seemed low enough). I need to return a computation (new data structure) based off of my readings from each partition, but I am NOT modifying anything in the original array.

Instead of trying the above exactly, I tried something simpler, but I did it incorrectly, because it didn't work in any capacity. So, then I tried to simply use child processes to push to a an array. The code below is using Time::HiRes to see how much faster I can get this to run using forking as opposed to not, but I'm not at that point yet (I'm going to be testing that when I have closer to a few million entries in my array):

use strict;
use warnings;
use Time::HiRes;

print "Starting main program\n";

my %child;
my @array=();
my $counter=0;
my $start = Time::HiRes::time();

for (my $count = 1; $count <= 10; $count++) 
{
        my $pid = fork();

        if ($pid) 
        {
                $child{$pid}++;
        } 
        elsif ($pid == 0) 
        {
                addToArray(\$counter,\@array); 
                exit 0;
        } 
        else 
        {
                die "couldnt fork: $!\n";
        }
}

while (keys %child)
{
    my $pid = waitpid(-1,0);
    delete $child{$pid};
}

my $stop = Time::HiRes::time();
my $duration = $stop-$start;

print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "End of main program\n";

sub addToArray 
{
my $start=shift;
my $count=${$start};
${$start}+=10;
my $array=shift;

  for (my $i=$count; $i<$count +10; $i++)
    {
     push @{$array}, $i;
    }

  print scalar(@{$array})."\n";
}

NB: I used push in lieu of ${$array}[$i]=$i, because I realized that my $counter wasn't actually updating, so that would never work with this code.

I assume that this doesn't work because the children are all copies of the original program and I'm never actually adding anything to the array in my "original program". On that note, I'm very stuck. Again, the actual problem that I'm actually trying to solve is how to partition my array (with data in it) and try to read them in parallel and return a computation based off of my readings (NOTE: I'm not going to modify the original array), but I'm never going to be able to do that if I can't figure out how to actually get my $counter to update. I'd also like to know how to get the code above to do what I want it to do, but that's a secondary goal.

Once I can get my counter to update correctly, is there any chance that another process would start before it updates and I wouldn't actually be reading in the entire array? If so, how do I account for this?

Please, any help would be much appreciated. I'm very frustrated/stuck. I hope there is an easy fix. Thanks in advance.

EDIT: I attempted to use Parallel::ForkManager, but to no avail:

#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;

my $pm = Parallel::ForkManager->new(10);

for (my $count = 1; $count <= 10; $count++) 
{
    my $pid = $pm->start and next;   
    sub1(\$counter,\@array); 
    $pm->finish; # Terminates the child process       
}
  $pm->wait_all_children;  

I didn't include the other extraneous stuff, see above for missing code/sub... Again, help would be much appreciated. I'm very new to this and kind of need someone to hold my hand. I also tried to do something with run_on_start and run_on_finish, but they didn't work either.

Upvotes: 3

Views: 535

Answers (4)

amon
amon

Reputation: 57656

Your code has two issues: Your child processes share no data, and you would have a race condition if forked processes would share data. The solution is to use threads. Any possibility for race conditions can be eliminated by partitioning the data in the parent thread, and of course, by not using shared data.

Threads

Threads in Perl behave similar to forking: by default, there is no shared memory. This makes using threads quite easy. However, each thread runs it own perl interpreter, which makes threads quite costly. Use sparingly.

First, we have to activate threading support via use threads. To start a thread, we do threads->create(\&code, @args), which returns a thread object. The code will then run in a separate thread, and will be invoked with the given arguments. After the thread has finished execution, we can collect the return value by calling $thread->join. Note: The context of the threaded code is determined by the create method, not by join.

We could mark variables with the :shared attribute. Your $counter and @array would be examples for this, but it is generally better to pass explicit copies of data around than to use shared state (disclaimer: from a theoretical standpoint, that is). To avoid race conditions with the shared data, you'd actually have to protect your $counter with a semaphore, but again, there is no need for shared state.

Here is a toy program showing how you could use threads to parallelize a calculation:

use strict;
use warnings;
use threads;
use 5.010; # for `say`, and sane threads
use Test::More;

# This program calculates differences between elements of an array

my @threads;
my @array = (1, 4, 3, 5, 5, 10, 7, 8);
my @delta = ( 3, -1, 2, 0, 5, -3, 1 );

my $number_of_threads = 3;
my @partitions = partition( $#array, $number_of_threads );
say "partitions: @partitions";

for (my $lower_bound = 0; @partitions; $lower_bound += shift @partitions) {
  my $upper_bound = $lower_bound + $partitions[0];
  say "spawning thread with [@array[$lower_bound .. $upper_bound]]";
  # pass copies of the values in the array slice to new thread:
  push @threads, threads->create(\&differences, @array[$lower_bound .. $upper_bound]);
  # note that threads->create was called in list context
}

my @received;
push @received, $_->join for @threads; # will block until all are finished

is_deeply \@received, \@delta;
done_testing;

# calculates the differences. This doesn't need shared memory.
# note that @array could have been safely accessed, as it is never written to
# If I had written to a (unshared) variable, these changes would have been thread-local
sub differences {
  say "Hi from a worker thread, I have ", 0+@_, " elements to work on";
  return map $_[$_] - $_[$_-1], 1 .. $#_;
  # or more readable:
  # my @d;
  # for my $i (1 .. $#_) {
  #   push @d, $_[$i] - $_[$i-1];
  # }
  # return @d;
}

# divide workload into somewhat fair parts, giving earlier threads more work
sub partition {
  my ($total, $parts) = @_;
  my $base_size = int($total / $parts);
  my @partitions = ($base_size) x $parts;
  $partitions[$_-1]++ for 1 .. $total - $base_size*$parts;
  return @partitions;
}

A note on the number of threads: This should depend on the number of processors of your system. If you have four cores, more than four threads don't make much sense.

Upvotes: 5

Steve P.
Steve P.

Reputation: 14709

So, after my struggle, here's the fix:

EDIT: THIS DOES NOT ACCOMPLISH WHAT I WANTED TO DO

#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;

print "Starting main program\n";
my @array=();
my $counter=0;
my $start = Time::HiRes::time();
my $max_processes=20;
my $partition=10;
my $max_elements=100;

my $pm = Parallel::ForkManager->new($max_processes);

$pm->run_on_start( sub {
          my ($pid, $exit_code, $ident) = @_;
          sub1(\$counter,\@array);  
      });    

while ($counter < $max_elements)
{
    my $pid = $pm->start and next;   

    $pm->finish; # Terminates the child process     
}
  $pm->wait_all_children;

my $stop = Time::HiRes::time();
my $duration = $stop-$start;

print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "\nEnd of main program\n";

sub sub1 {
my $start=shift;
my $count=${$start};
${$start}+=$partition;
my $array=shift;

  for (my $i=$count; $i<$count + $partition; $i++)
{
     push @{$array}, $i;
}
     return @{$array};
}

Upvotes: 0

Jonathan Leffler
Jonathan Leffler

Reputation: 755114

If you're going to use child processes after forking, each child process is autonomous and has its own copy of the data in the program as of the time it was forked from the main program. The changes made by the child in its own memory have no effect on the parent's memory. If you need that, either you need a threading Perl and to use threads, or you need to think again — maybe using shared memory, but locating Perl data into the shared memory might be tricky.

So, one option is to read all the data into memory before forking off and having the children work on their own copies of the data.

Depending on the structure of the problem, another possibility might be to have each child read and work on a portion of the data. This won't work if each child must have access to all the data.

It isn't clear how much speed up you'll get through threading or forking if the threads or processes are all tied up reading the same file. Getting the data into memory may be best treated as a single-threaded (single-tasking) operation; the parallelism can spring into effect — and yield benefits — once the data is in memory.

Upvotes: 3

Miguel Prz
Miguel Prz

Reputation: 13792

There are some CPAN modules that makes your life easier. One of them is Parallel::ForkManager, which is a simple parallel processing fork manager

Upvotes: 0

Related Questions