Reputation: 14709
So, basically I have a very large array that I need to read data from. I want to be able to do this in parallel; however, when I tried, I failed miserably. For the sake of simplicity, let's say I have an array with 100 elements in it. My idea was to partition the array into 10 equals parts and try to read them in parallel (10 is arbitrary, but I don't know how many processes I could run at once and 10 seemed low enough). I need to return a computation (new data structure) based off of my readings from each partition, but I am NOT modifying anything in the original array.
Instead of trying the above exactly, I tried something simpler, but I did it incorrectly, because it didn't work in any capacity. So, then I tried to simply use child processes to push to a an array. The code below is using Time::HiRes
to see how much faster I can get this to run using forking as opposed to not, but I'm not at that point yet (I'm going to be testing that when I have closer to a few million entries in my array):
use strict;
use warnings;
use Time::HiRes;
print "Starting main program\n";
my %child;
my @array=();
my $counter=0;
my $start = Time::HiRes::time();
for (my $count = 1; $count <= 10; $count++)
{
my $pid = fork();
if ($pid)
{
$child{$pid}++;
}
elsif ($pid == 0)
{
addToArray(\$counter,\@array);
exit 0;
}
else
{
die "couldnt fork: $!\n";
}
}
while (keys %child)
{
my $pid = waitpid(-1,0);
delete $child{$pid};
}
my $stop = Time::HiRes::time();
my $duration = $stop-$start;
print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "End of main program\n";
sub addToArray
{
my $start=shift;
my $count=${$start};
${$start}+=10;
my $array=shift;
for (my $i=$count; $i<$count +10; $i++)
{
push @{$array}, $i;
}
print scalar(@{$array})."\n";
}
NB: I used push in lieu of ${$array}[$i]=$i
, because I realized that my $counter
wasn't actually updating, so that would never work with this code.
I assume that this doesn't work because the children are all copies of the original program and I'm never actually adding anything to the array in my "original program". On that note, I'm very stuck. Again, the actual problem that I'm actually trying to solve is how to partition my array (with data in it) and try to read them in parallel and return a computation based off of my readings (NOTE: I'm not going to modify the original array), but I'm never going to be able to do that if I can't figure out how to actually get my $counter
to update. I'd also like to know how to get the code above to do what I want it to do, but that's a secondary goal.
Once I can get my counter to update correctly, is there any chance that another process would start before it updates and I wouldn't actually be reading in the entire array? If so, how do I account for this?
Please, any help would be much appreciated. I'm very frustrated/stuck. I hope there is an easy fix. Thanks in advance.
EDIT: I attempted to use Parallel::ForkManager, but to no avail:
#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;
my $pm = Parallel::ForkManager->new(10);
for (my $count = 1; $count <= 10; $count++)
{
my $pid = $pm->start and next;
sub1(\$counter,\@array);
$pm->finish; # Terminates the child process
}
$pm->wait_all_children;
I didn't include the other extraneous stuff, see above for missing code/sub... Again, help would be much appreciated. I'm very new to this and kind of need someone to hold my hand. I also tried to do something with run_on_start
and run_on_finish
, but they didn't work either.
Upvotes: 3
Views: 535
Reputation: 57656
Your code has two issues: Your child processes share no data, and you would have a race condition if forked processes would share data. The solution is to use threads
. Any possibility for race conditions can be eliminated by partitioning the data in the parent thread, and of course, by not using shared data.
Threads in Perl behave similar to fork
ing: by default, there is no shared memory. This makes using threads quite easy. However, each thread runs it own perl interpreter, which makes threads quite costly. Use sparingly.
First, we have to activate threading support via use threads
. To start a thread, we do threads->create(\&code, @args)
, which returns a thread object. The code will then run in a separate thread, and will be invoked with the given arguments. After the thread has finished execution, we can collect the return value by calling $thread->join
. Note: The context of the threaded code is determined by the create
method, not by join
.
We could mark variables with the :shared
attribute. Your $counter
and @array
would be examples for this, but it is generally better to pass explicit copies of data around than to use shared state (disclaimer: from a theoretical standpoint, that is). To avoid race conditions with the shared data, you'd actually have to protect your $counter
with a semaphore, but again, there is no need for shared state.
Here is a toy program showing how you could use threads to parallelize a calculation:
use strict;
use warnings;
use threads;
use 5.010; # for `say`, and sane threads
use Test::More;
# This program calculates differences between elements of an array
my @threads;
my @array = (1, 4, 3, 5, 5, 10, 7, 8);
my @delta = ( 3, -1, 2, 0, 5, -3, 1 );
my $number_of_threads = 3;
my @partitions = partition( $#array, $number_of_threads );
say "partitions: @partitions";
for (my $lower_bound = 0; @partitions; $lower_bound += shift @partitions) {
my $upper_bound = $lower_bound + $partitions[0];
say "spawning thread with [@array[$lower_bound .. $upper_bound]]";
# pass copies of the values in the array slice to new thread:
push @threads, threads->create(\&differences, @array[$lower_bound .. $upper_bound]);
# note that threads->create was called in list context
}
my @received;
push @received, $_->join for @threads; # will block until all are finished
is_deeply \@received, \@delta;
done_testing;
# calculates the differences. This doesn't need shared memory.
# note that @array could have been safely accessed, as it is never written to
# If I had written to a (unshared) variable, these changes would have been thread-local
sub differences {
say "Hi from a worker thread, I have ", 0+@_, " elements to work on";
return map $_[$_] - $_[$_-1], 1 .. $#_;
# or more readable:
# my @d;
# for my $i (1 .. $#_) {
# push @d, $_[$i] - $_[$i-1];
# }
# return @d;
}
# divide workload into somewhat fair parts, giving earlier threads more work
sub partition {
my ($total, $parts) = @_;
my $base_size = int($total / $parts);
my @partitions = ($base_size) x $parts;
$partitions[$_-1]++ for 1 .. $total - $base_size*$parts;
return @partitions;
}
A note on the number of threads: This should depend on the number of processors of your system. If you have four cores, more than four threads don't make much sense.
Upvotes: 5
Reputation: 14709
So, after my struggle, here's the fix:
EDIT: THIS DOES NOT ACCOMPLISH WHAT I WANTED TO DO
#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;
print "Starting main program\n";
my @array=();
my $counter=0;
my $start = Time::HiRes::time();
my $max_processes=20;
my $partition=10;
my $max_elements=100;
my $pm = Parallel::ForkManager->new($max_processes);
$pm->run_on_start( sub {
my ($pid, $exit_code, $ident) = @_;
sub1(\$counter,\@array);
});
while ($counter < $max_elements)
{
my $pid = $pm->start and next;
$pm->finish; # Terminates the child process
}
$pm->wait_all_children;
my $stop = Time::HiRes::time();
my $duration = $stop-$start;
print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "\nEnd of main program\n";
sub sub1 {
my $start=shift;
my $count=${$start};
${$start}+=$partition;
my $array=shift;
for (my $i=$count; $i<$count + $partition; $i++)
{
push @{$array}, $i;
}
return @{$array};
}
Upvotes: 0
Reputation: 755114
If you're going to use child processes after forking, each child process is autonomous and has its own copy of the data in the program as of the time it was forked from the main program. The changes made by the child in its own memory have no effect on the parent's memory. If you need that, either you need a threading Perl and to use threads, or you need to think again — maybe using shared memory, but locating Perl data into the shared memory might be tricky.
So, one option is to read all the data into memory before forking off and having the children work on their own copies of the data.
Depending on the structure of the problem, another possibility might be to have each child read and work on a portion of the data. This won't work if each child must have access to all the data.
It isn't clear how much speed up you'll get through threading or forking if the threads or processes are all tied up reading the same file. Getting the data into memory may be best treated as a single-threaded (single-tasking) operation; the parallelism can spring into effect — and yield benefits — once the data is in memory.
Upvotes: 3
Reputation: 13792
There are some CPAN modules that makes your life easier. One of them is Parallel::ForkManager, which is a simple parallel processing fork manager
Upvotes: 0