Reputation: 9
I am trying below code but the queue is not incrementing automatically after thread 1 is complete it should check for new worker count and queue them up, but here worker count is getting set once and all threads are completing the work up to workers count,
use threads;
use Thread::Queue;
use threads::shared;
$count = 5;
sub process
{
print "proccess started";
return $count --;
}
sub getWorkItems
{
$n =&process;
return $n--;
}
print "******************worker_count*********************\n";
print $n;
sub worker {
my $tid = threads->tid;
my( $Qwork, $Qresults ) = @_;
while( my $work = $Qwork->dequeue )
{
sub result
{ my $i = threads->tid;
print "\nhello \n";
print "$i";
print "\n"
}
&result;
$Qresults->enqueue();
}
}
our $THREADS = 1;
my $Qwork = new Thread::Queue;
my $Qresults = new Thread::Queue;
## Create the pool of workers
my @pool = map
{
threads->create( \&worker, $Qwork, $Qresults )
} 1 .. $THREADS;
## Get the work items (from somewhere)
## and queue them up for the workers
while( my $workItem = getWorkItems() )
{
$Qwork->enqueue( $workItem );
}
$Qwork->enqueue( (undef) x $THREADS );
for ( 1 .. $THREADS )
{
while( $Qresults->dequeue )
{
## Do something with the result ##
print "Request_proccessed";
}
}
## Clean up the threads
$_->join for @pool;
output :::
hello 1
hello 1
hello 1
hello 1
hello 1
Upvotes: 0
Views: 76
Reputation: 386386
Before we start, let me mention that you should ALWAYS use use strict; use warnings;
.
You have three major problems:
threads->tid
, but I think you mean to print $work
.$Qresults
, but the main thread expects there to be at least one false value in it.Fixed and cleaned up code:
use strict;
use warnings;
use feature qw( say );
use threads;
use Thread::Queue 3.01;
use constant NUM_THREADS => 1;
sub worker {
my ($job) = @_;
return $job*2;
}
{
my $job_q = Thread::Queue->new();
my $result_q = Thread::Queue->new();
my @threads =
map {
async {
while (defined( my $job = $job_q->dequeue() )) {
my $result = worker($job);
$result_q->enqueue($result);
}
}
}
1..NUM_THREADS;
# Add some jobs to the queue.
my $num_jobs = 0;
for my $job (1..5) {
$job_q->enqueue($job);
++$num_jobs;
}
$job_q->end();
# Collect the result of the jobs.
for (1..$num_jobs) {
my $result = $result_q->dequeue();
say "Result: $result";
}
$_->join for @threads;
}
Note how the worker doesn't need to know anything about the queues or threads.
Upvotes: 2