Reputation: 103
I have the following perl code which makes asynchronous calls to two external bioinformatics programs. First, it runs a blastJob, and then it takes the results from that and runs an exonerateJob. I've adapted this code from a previous question about moving my code to a multi-threaded approach.
The problem is frustrating because it only happens after several hours of running. I will leave the program to run overnight and find in the morning that the exonerateJobs are no longer running, but new blastJobs are still cranking through. There are no reported errors or anything. Another tidbit of information is that I've gone back and tested the input queries where the logs show that the exonerateJobs stopped working. The program completes just fine if I run a small number of queries through it, even if they're the queries that seemed to have caused problems previously. Since I'm not too familiar with the rules of multithreading, I'd like to know if there's a problem with my approach, or if it's potentially a problem with the external programs that are called. Here's the bit of code:
#Asynchronous calls to blast and exonerate
{
my $blast_request_queue = Thread::Queue->new();
my $exonerate_request_queue = Thread::Queue->new();
my @blast_threads;
for (1..NUM_BLAST_WORKERS) {
push @blast_threads, async {
while (my $q = $blast_request_queue->dequeue()) {
my @results = blastJob($q, $blastopts_ref);
foreach (@results) {
my @args = ($q, $_);
$exonerate_request_queue->enqueue(\@args);
}
}
$exonerate_request_queue->end(); # I've tried with and without this line, the result seems to be the same
};
}
my @exonerate_threads;
for (1..NUM_EXONERATE_WORKERS) {
push @exonerate_threads, async {
while (my $args_ref = $exonerate_request_queue->dequeue()) {
my ($queryFile, $targetName) = @$args_ref; #De-reference args
my $regex = qr/\Q$targetName\E/;
#Check for target file
my ($file_match) = grep { $_ =~ $regex } keys %targets;
if ($file_match) {
my $targetFile = $options{'t'} . $file_match;
my $result = exonerateJob($queryFile, $targetFile, $exonopts_ref);
#Print result to file after job is finished
my ($Qname, $Qpath, $Qsuffix) = fileparse($queryFile);
my $outFN = $Qname . ".exonerate_out";
open(OUTFH, ">>$outFN") or print STDERR "Can't open $outFN: $!";
print OUTFH $result;
} else {
print STDERR "Target file not found: $targetName. Can't run exonerate";
}
}
};
}
foreach (@queries) {
#Concatenate query path with name
my $queryFile = $options{'q'} . $_;
$blast_request_queue->enqueue($queryFile);
}
#my $queryFile = $options{'q'} . $queries[3];
#$blast_request_queue->enqueue($queryFile);
$blast_request_queue->end();
$_->join() for @blast_threads;
$exonerate_request_queue->end();
$_->join() for @exonerate_threads;
}
#I'm using IPC::Run to launch the programs.
#There is some error handling which I believe should catch any probs
sub blastJob {
my ($query, $blastopts_ref) = @_;
#De-reference blast options
my @blastCmd = @$blastopts_ref;
my ($blastOut, $err); #for blast output
#Add query information after first blast option
splice(@blastCmd, 1, 0, ("-query", $query));
my ($Qname, $Qpath, $Qsuffix) = fileparse($query);
print "Running $blastCmd[0]: query $Qname...\n";
run \@blastCmd, \undef, \$blastOut, \$err;
if ($err) {
print "Error in BLAST query $Qname. $err\n";
}
my @results = split("\n", $blastOut);
return uniq(@results);
}
sub exonerateJob {
my ($query, $target, $exonopts_ref) = @_;
#De-reference exonerate options
my @exonCmd = @$exonopts_ref;
my ($exonOut, $err); #for exonerate output
#Add program, query, and target information to exonerate options
unshift (@exonCmd, ("exonerate", "-q", $query, "-t", $target));
my ($Qname, $Qpath, $Qsuffix) = fileparse($query);
my ($Tname, $Tpath, $Tsuffix) = fileparse($target);
eval {
print "Running exonerate: query $Qname, target $Tname...\n";
run \@exonCmd, \undef, \$exonOut, \$err, timeout(240);
if ($err) {
print STDERR "Error in exonerate query $Qname, target $Tname. $err\n";
}
};
if ($@ =~ /timeout/) {
print STDERR "Error: timeout in exonerate query $Qname, target $Tname\n";
}
return $exonOut;
}
Upvotes: 0
Views: 78
Reputation: 385789
You've gone to a lot of trouble to avoid die
, when you should simply have added an eval BLOCK
around the worker code.
Change
my $result = job1($job);
$job2_request_queue->enqueue($result);
to
my $result = eval { job1($job) };
if ($@) {
warn("Job failed: $@");
} else {
$job2_request_queue->enqueue($result);
}
This is far more reliable. For example, run
can throw an exception which will kill your child.
Also, as Sobrique mentioned, the topmost instance of $exonerate_request_queue->end();
should not been added. That prevents further work from being added to the queue (and signals the exonerate workers to exit once all the work currently in the queue has been performed). That should only be done after every blast worker has exited, but this addition causes this to be done as soon as the first blast worker exits.
Upvotes: 1
Reputation: 53478
Without your source information, I can't test it - but my money would be on the first time you:
$exonerate_request_queue->end();
In that async block.
Because I think it's quite possible there, that as soon as you close the
$blast_request_queue->end();
Then a thread will exit soon after, close the 'output' queue, and in doing so - mean you lose anything that was pending, because the queue is closed.
Upvotes: 1