akhlesh
akhlesh

Reputation: 9

I want to auto increment queue in perl

I am trying below code but the queue is not incrementing automatically after thread 1 is complete it should check for new worker count and queue them up, but here worker count is getting set once and all threads are completing the work up to workers count,

use threads;
use Thread::Queue;
use threads::shared;

$count = 5; 
sub process
          {
             print "proccess started";
             return $count --;
           }


sub getWorkItems 
               {  
                 $n =&process;
                 return $n--;
               }
print "******************worker_count*********************\n";
 print $n;


     sub worker {
                  my $tid = threads->tid;
                  my( $Qwork, $Qresults ) = @_;
                  while( my $work = $Qwork->dequeue ) 
                       {  
                          sub result 
                               { my $i = threads->tid; 
                                  print "\nhello \n";
                                  print  "$i";
                                  print "\n"
                                }
                         &result;
                     $Qresults->enqueue();
                  }
              }

our $THREADS = 1;

my $Qwork = new Thread::Queue;
my $Qresults = new Thread::Queue;


## Create the pool of workers
my @pool = map
{
    threads->create( \&worker, $Qwork, $Qresults )

} 1 .. $THREADS;

## Get the work items (from somewhere)
## and queue them up for the workers

while( my $workItem = getWorkItems()  ) 
    {
      $Qwork->enqueue( $workItem );
     }

$Qwork->enqueue( (undef) x $THREADS );

for ( 1 .. $THREADS ) 
    {

    while( $Qresults->dequeue ) 
         {
         ## Do something with the result ##
          print "Request_proccessed";
          }

    }

## Clean up the threads
$_->join for @pool;

output :::

hello 1

hello 1

hello 1

hello 1

hello 1

Upvotes: 0

Views: 76

Answers (1)

ikegami
ikegami

Reputation: 386386

Before we start, let me mention that you should ALWAYS use use strict; use warnings;.

You have three major problems:

  • You print threads->tid, but I think you mean to print $work.
  • You have a named sub inside of another named sub. Never do that. There's no point in doing so —the inner sub isn't private to the outer sub— and it causes problems.
  • You never add anything to $Qresults, but the main thread expects there to be at least one false value in it.

Fixed and cleaned up code:

use strict;
use warnings;
use feature qw( say );
use threads;

use Thread::Queue 3.01;

use constant NUM_THREADS => 1;

sub worker {
   my ($job) = @_;
   return $job*2;
}

{
   my $job_q    = Thread::Queue->new();
   my $result_q = Thread::Queue->new();

   my @threads =
      map {
         async {
            while (defined( my $job = $job_q->dequeue() )) {
               my $result = worker($job);
               $result_q->enqueue($result);
            }
         }
      }
         1..NUM_THREADS;

   # Add some jobs to the queue.
   my $num_jobs = 0;
   for my $job (1..5) {
      $job_q->enqueue($job);
      ++$num_jobs;
   }

   $job_q->end();

   # Collect the result of the jobs.
   for (1..$num_jobs) {
      my $result = $result_q->dequeue();
      say "Result: $result";
   }

   $_->join for @threads;
}

Note how the worker doesn't need to know anything about the queues or threads.

Upvotes: 2

Related Questions