Chris
Chris

Reputation: 3715

Perl: Run recursive jobs in parallel

I have a recursive function that calls a system command to list files and directory. For each directory it will call itself again.

This process could take a while. This is why I would like to run parallel jobs.

I was looking into ForkManager but it would not allow creating new sub forks. As the amount of subprocess should be limited to 10 I was thinking of a 'worker' concept. Having 10 workers waiting for jobs to be executed.

My recursive function:

sub pullDataFromDbWithDirectory {
    my $_dir = $_[0];
    my @list = ();

    if ($itemCount lt $maxNumberOfItems) {
        my @retval = grep { /dir|file/ } map { s/^Dir\s+|^File\s+|\n//g; $_ } qx($omnidb -filesystem $filesystem  '$label'  -listdir '$_dir');

        foreach my $item (@retval) {
            $itemCount++;

            push(@list,$item) if $item =~ /^file/;

            if ($item =~ /^dir/) {
                my $subdir = "$_dir/$item";
                $data{$subdir} = ();

                if ($recursive) {
                    pullDataFromDbWithDirectory($subdir);
                }
            }
        }

        $data{$_dir} = \@list;
    }
}

Any help would be much appreciated.

Update:

The problem is solved. Thanks for the input. I modified my code:

sub pullDataFromDbWithDirectory {
    my $_dir = $_[0];

    if ($itemCount <= $maxNumberOfItems) {
        my @retval = grep { /dir|file/ } map { s/^Dir\s+|^File\s+|\n//g; $_ } qx($omnidb -filesystem $filesystem  '$label'  -listdir '$_dir');

        foreach my $item (@retval) {
            $itemCount++;
            my $file = "$_dir/$item";
            push(@data,$file);

            if ($item =~ /^dir/) {
                $worker->enqueue($file);
                print "Add $file to queue\n" if $debug;
            }
        }
    }
}

sub doOperation () {
    my $ithread = threads->tid();
    while (my $folder = $worker->dequeue()) {
        print "Read $folder from queue\n" if $debug;
        pullDataFromDbWithDirectory($folder);
    }
}

my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory($directory);
$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;

Upvotes: 2

Views: 293

Answers (1)

user1126070
user1126070

Reputation: 5069

I would rewrite your code to use an appropriate Perl module, like File::Find it is much more effective.

use File::Find;
my %data;
find(\&wanted, @directories_to_search);
sub wanted {
  $data{$File::Find::dir} = $_;

}

For paralel operation I would use Thread::Queue like this:

use strict;
use warnings;
use threads;

use threads;
use Thread::Queue;

my $q = Thread::Queue->new();    # A new empty queue
my %seen: shared;

# Worker thread
my @thrs = threads->create(\&doOperation ) for 1..5;#for 5 threads
add_file_to_q('/tmp/');
$q->enqueue('//_DONE_//') for @thrs;
$_->join() for @thrs;

sub add_file_to_q {
  my $dir = shift;
  my @files = `ls -1 $dir/`;chomp(@files);
  #add files to queue
  foreach my $f (@files){
    # Send work to the thread
    $q->enqueue($f);
    print "Pending items: "$q->pending()."\n";
  }
}



sub doOperation () {
    my $ithread = threads->tid() ;
    while (my $filename = $q->dequeue()) {
      # Do work on $item
      sleep(1) if ! defined $filename;
      return 1 if $filename eq '//_DONE_//';
      next if $seen{$filename};
      print "[id=$ithread]\t$filename\n";
      $seen{$filename} = 1;
      ### add files if it is a directory (check with symlinks, no file with //_DONE_// name!)
      add_file_to_q($filename) if -d $filename;
    }
    return 1;
}

Upvotes: 2

Related Questions