Lisa
Lisa

Reputation: 331

Perl Multithreading - Perl exited with active threads: finished and unjoined

I am trying to create threads of a subroutine in my program (called Recombination). I have used the code below to create the threads, which I adapted from http://chicken.genouest.org/perl/multi-threading-with-perl/ I used this code as these threads are created in a loop, with the number of threads dependent upon the variable $ParentTally which is different for each loop (and $ParentTally can be up to 1000, and I didn't want to run 1000 threads at once)

my $nb_process = 20;
my $nb_compute = $ParentTally;
my $i=0;
my @running = ();
my @Threads;
my %NewPopulation;

while (scalar @Threads < $nb_compute) {
    @running = threads->list(threads::running);
    if (scalar @running < $nb_process) {
        my $Offspring= threads->new (sub {Recombination(\%Parent1Chromosome, \%Parent2Chromosome)});
        push (@Threads, $Offspring);
        my $tid = $Offspring->tid;
    }

    @running = threads->list(threads::running);
    foreach my $thr (@Threads) {
        if ($thr->is_running()) {
           my $tid = $thr->tid;
        }
       elsif ($thr->is_joinable()) {
          my $tid = $thr->tid;
          my $Offspring1=$thr->join();
          $NewPopulation{$Offspring1}{'Tally'}+=1;
       }
    }
    @running = threads->list(threads::running);
    $i++;
}

while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }
    @running = threads->list(threads::running);
}

(Note: $ParentTally is taken from another hash earlier in the code, my $ParentTally=$hashref->{'Tally'}; so this part of the program loops around with a different value for $ParentTally each time. %Parent1Chromosome & %Parent2Chromosome are created earlier in the program. The subroutine 'Recombination' is quite long so I haven't posted it, but it returns an integer. )

Often when running the program (although not always, a lot of the earlier code is dependent upon random variables so the program never runs the same) once it is finished I get 'Perl exited with active threads: 'number' finished and unjoined' ('number' varies depending upon the run). I thought that:

 while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }

would mean all threads would finish before moving onto the next section of code? What am I doing wrong? (I have never used threads before). I had looked into using http://www.perlmonks.org/?node_id=735931 but I didn't really understand how to use Thread::Queue and wasn't able to find a tutorial (and didn't understand the http://perldoc.perl.org/Thread/Queue.html ). Thanks

Upvotes: 0

Views: 5520

Answers (2)

beresfordt
beresfordt

Reputation: 5222

Not a fix for your code, but here's an outline for how I'd do it using queues (obviously needs some filling in to fit your purpose). There are many ways to improve on this if memory use is and issue - each thread spawned takes a full copy of all in scope variables; it's easy to hit memory problems when using threads

#!/usr/bin/perl

use strict ;
use threads ;
use Thread::Queue ;

my $threadCount = 2 ;

my $DataQueue = Thread::Queue->new() ;
my $ReportQueue = Thread::Queue->new() ;

my $threads = [] ;

# create pool of worker threads
for ( my $i = 0 ; $i<$threadCount ; $i ++ ){
    push( @$threads, threads->create( \&doStuff, $DataQueue, $ReportQueue ) ) ;
}

# array of data on which the threads have to work
my @array ;
# put work onto queue for threads to process
foreach my $workItem ( @array ){
   $DataQueue->enqueue( $workItem );
}

# enqueue undef for each worker to tell it no more work
# then wait for them all to join
$DataQueue->enqueue( (undef) x $threadCount ) ;
$_->join for @$threads ;

my %NewPopulation ;
# read the output of the threads from ReportQueue
while ( my $reportItem = $ReportQueue->dequeue() ){
    $NewPopulation{$reportItem}{'Tally'}++ ;
}

# display tallys
for my $offspring ( keys %NewPopulation ){
    print "Offspring $offspring Tally => " . $NewPopulation{$offspring}{'Tally'} . "\n" ;
}

sub doStuff{
    my ( $DataQueue, $ReportQueue ) = @_ ;

    while ( my $inputHash = $DataQueue->dequeue() ){
        my $result ;
        # do things here - the logic in your Recombination sub

        # return result to report queue
        $ReportQueue->enqueue($result) ;
    }

   # Enqueue undef to report queue so report thread knows we're done
   $ReportQueue->enqueue( undef ) ;

}

Upvotes: 4

Joel
Joel

Reputation: 3483

I believe the error is in the last while loop:

while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }
    @running = threads->list(threads::running);
}

According to the threads document, a call to is_joinable returns true only if a thread has finished running, hasn't been detached and hasn't been joined already. My guess is that when you get to this section you still have threads that are running, and so you skip over them. You might make another call, like you did in the previous while loop, to is_running to see if you still have threads running and handle the threads in some way.

Upvotes: 2

Related Questions