Reputation: 3715
I have a recursive function that calls a system command to list files and directory. For each directory it will call itself again.
This process could take a while. This is why I would like to run parallel jobs.
I was looking into ForkManager but it would not allow creating new sub forks. As the amount of subprocess should be limited to 10 I was thinking of a 'worker' concept. Having 10 workers waiting for jobs to be executed.
My recursive function:
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
my @list = ();
if ($itemCount lt $maxNumberOfItems) {
my @retval = grep { /dir|file/ } map { s/^Dir\s+|^File\s+|\n//g; $_ } qx($omnidb -filesystem $filesystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
push(@list,$item) if $item =~ /^file/;
if ($item =~ /^dir/) {
my $subdir = "$_dir/$item";
$data{$subdir} = ();
if ($recursive) {
pullDataFromDbWithDirectory($subdir);
}
}
}
$data{$_dir} = \@list;
}
}
Any help would be much appreciated.
Update:
The problem is solved. Thanks for the input. I modified my code:
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
if ($itemCount <= $maxNumberOfItems) {
my @retval = grep { /dir|file/ } map { s/^Dir\s+|^File\s+|\n//g; $_ } qx($omnidb -filesystem $filesystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
my $file = "$_dir/$item";
push(@data,$file);
if ($item =~ /^dir/) {
$worker->enqueue($file);
print "Add $file to queue\n" if $debug;
}
}
}
}
sub doOperation () {
my $ithread = threads->tid();
while (my $folder = $worker->dequeue()) {
print "Read $folder from queue\n" if $debug;
pullDataFromDbWithDirectory($folder);
}
}
my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory($directory);
$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;
Upvotes: 2
Views: 293
Reputation: 5069
I would rewrite your code to use an appropriate Perl module, like File::Find it is much more effective.
use File::Find;
my %data;
find(\&wanted, @directories_to_search);
sub wanted {
$data{$File::Find::dir} = $_;
}
For paralel operation I would use Thread::Queue like this:
use strict;
use warnings;
use threads;
use threads;
use Thread::Queue;
my $q = Thread::Queue->new(); # A new empty queue
my %seen: shared;
# Worker thread
my @thrs = threads->create(\&doOperation ) for 1..5;#for 5 threads
add_file_to_q('/tmp/');
$q->enqueue('//_DONE_//') for @thrs;
$_->join() for @thrs;
sub add_file_to_q {
my $dir = shift;
my @files = `ls -1 $dir/`;chomp(@files);
#add files to queue
foreach my $f (@files){
# Send work to the thread
$q->enqueue($f);
print "Pending items: "$q->pending()."\n";
}
}
sub doOperation () {
my $ithread = threads->tid() ;
while (my $filename = $q->dequeue()) {
# Do work on $item
sleep(1) if ! defined $filename;
return 1 if $filename eq '//_DONE_//';
next if $seen{$filename};
print "[id=$ithread]\t$filename\n";
$seen{$filename} = 1;
### add files if it is a directory (check with symlinks, no file with //_DONE_// name!)
add_file_to_q($filename) if -d $filename;
}
return 1;
}
Upvotes: 2