Tsaari
Tsaari

Reputation: 103

Perl Multithreading - worker threads stops working

I have the following perl code which makes asynchronous calls to two external bioinformatics programs. First, it runs a blastJob, and then it takes the results from that and runs an exonerateJob. I've adapted this code from a previous question about moving my code to a multi-threaded approach.

The problem is frustrating because it only happens after several hours of running. I will leave the program to run overnight and find in the morning that the exonerateJobs are no longer running, but new blastJobs are still cranking through. There are no reported errors or anything. Another tidbit of information is that I've gone back and tested the input queries where the logs show that the exonerateJobs stopped working. The program completes just fine if I run a small number of queries through it, even if they're the queries that seemed to have caused problems previously. Since I'm not too familiar with the rules of multithreading, I'd like to know if there's a problem with my approach, or if it's potentially a problem with the external programs that are called. Here's the bit of code:

#Asynchronous calls to blast and exonerate
{
   my $blast_request_queue = Thread::Queue->new();
   my $exonerate_request_queue = Thread::Queue->new();

   my @blast_threads;
   for (1..NUM_BLAST_WORKERS) {
      push @blast_threads, async {
         while (my $q = $blast_request_queue->dequeue()) {
            my @results = blastJob($q, $blastopts_ref);
            foreach (@results) {
               my @args = ($q, $_);
               $exonerate_request_queue->enqueue(\@args);
            }
         }

         $exonerate_request_queue->end(); # I've tried with and without this line, the result seems to be the same
      };
   }

   my @exonerate_threads;
   for (1..NUM_EXONERATE_WORKERS) {
      push @exonerate_threads, async {
         while (my $args_ref = $exonerate_request_queue->dequeue()) {
            my ($queryFile, $targetName) = @$args_ref; #De-reference args
            my $regex = qr/\Q$targetName\E/;
            #Check for target file
            my ($file_match) = grep { $_ =~ $regex } keys %targets;
            if ($file_match) {
                my $targetFile = $options{'t'} . $file_match;
                my $result = exonerateJob($queryFile, $targetFile, $exonopts_ref);
                #Print result to file after job is finished
                my ($Qname, $Qpath, $Qsuffix) = fileparse($queryFile);
                my $outFN = $Qname . ".exonerate_out";
                open(OUTFH, ">>$outFN") or print STDERR "Can't open $outFN: $!";
                print OUTFH $result;
            } else {
                print STDERR "Target file not found: $targetName. Can't run exonerate";
            }
         }
      };
   }

    foreach (@queries) {
        #Concatenate query path with name
        my $queryFile = $options{'q'} . $_;
        $blast_request_queue->enqueue($queryFile);
    }
    #my $queryFile = $options{'q'} . $queries[3];
    #$blast_request_queue->enqueue($queryFile);

   $blast_request_queue->end();    
   $_->join() for @blast_threads;
   $exonerate_request_queue->end();
   $_->join() for @exonerate_threads;
}

#I'm using IPC::Run to launch the programs.
#There is some error handling which I believe should catch any probs
sub blastJob {
    my ($query, $blastopts_ref) = @_;
    #De-reference blast options
    my @blastCmd = @$blastopts_ref;
    my ($blastOut, $err); #for blast output
    #Add query information after first blast option
    splice(@blastCmd, 1, 0, ("-query", $query));
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query);
    print "Running $blastCmd[0]: query $Qname...\n";
    run \@blastCmd, \undef, \$blastOut, \$err;
    if ($err) {
        print "Error in BLAST query $Qname. $err\n";
    }
    my @results = split("\n", $blastOut);
    return uniq(@results);
}

sub exonerateJob {
    my ($query, $target, $exonopts_ref) = @_;
    #De-reference exonerate options
    my @exonCmd = @$exonopts_ref;
    my ($exonOut, $err); #for exonerate output
    #Add program, query, and target information to exonerate options
    unshift (@exonCmd, ("exonerate", "-q", $query, "-t", $target));
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query);
    my ($Tname, $Tpath, $Tsuffix) = fileparse($target);
    eval {
        print "Running exonerate: query $Qname, target $Tname...\n";
        run \@exonCmd, \undef, \$exonOut, \$err, timeout(240);
        if ($err) {
            print STDERR "Error in exonerate query $Qname, target $Tname. $err\n";
        }
    };
    if ($@ =~ /timeout/) {
        print STDERR "Error: timeout in exonerate query $Qname, target $Tname\n";
    }

    return $exonOut;
}

Upvotes: 0

Views: 78

Answers (2)

ikegami
ikegami

Reputation: 385789

You've gone to a lot of trouble to avoid die, when you should simply have added an eval BLOCK around the worker code.

Change

my $result = job1($job);
$job2_request_queue->enqueue($result);

to

my $result = eval { job1($job) };
if ($@) {
   warn("Job failed: $@");
} else {
   $job2_request_queue->enqueue($result);
}

This is far more reliable. For example, run can throw an exception which will kill your child.


Also, as Sobrique mentioned, the topmost instance of $exonerate_request_queue->end(); should not been added. That prevents further work from being added to the queue (and signals the exonerate workers to exit once all the work currently in the queue has been performed). That should only be done after every blast worker has exited, but this addition causes this to be done as soon as the first blast worker exits.

Upvotes: 1

Sobrique
Sobrique

Reputation: 53478

Without your source information, I can't test it - but my money would be on the first time you:

$exonerate_request_queue->end();

In that async block.

Because I think it's quite possible there, that as soon as you close the

$blast_request_queue->end(); 

Then a thread will exit soon after, close the 'output' queue, and in doing so - mean you lose anything that was pending, because the queue is closed.

Upvotes: 1

Related Questions