Reputation: 13
I am a newbie in Perl, so in educational purposes I am developing multithreaded server using AnyEvent and Coro. The client sends to server list of directory paths and server responses with listing of these directories.
I am using tcp_server and AnyEvent::Handle for connections handling, and for each client I want server to check thread pool (which is actually pool of coros) for free coro to handle request. When handling request is finished, I want coro to wait for another client instead of finishing.
However, it seems like at the end of handle_request sub, coro is destroyed and not avaliable anymore.
#!/usr/bin/perl
use strict;
use v5.18;
use AnyEvent;
use AnyEvent::Socket qw(tcp_server);
use AnyEvent::Handle;
use Coro;
use Class::Struct;
print("Server is running...\n");
# dirs function
sub print_dirs {
my $dir_list = $_[0];
my @dirs = split(" ", $dir_list);
my $result = "";
for my $dir (@dirs) {
if (opendir my $dirent, $dir) {
my @files = readdir $dirent;
closedir $dirent;
$result = $result . "\nContents of $dir:\r\n" . join("\r\n", @files) . "\r\n";
} else {
$result = $result . "Failed to open $dir: $!\r\n";
}
}
return $result;
}
# thread struct
struct clt_thread => {
id => '$',
thread => '$',
is_busy => '$',
args => '$',
client => '$',
clt_key => '$'
};
my $threads_num = 16;
my $thread_id = 0;
my @pool = ();
# handling request
my $cv = AE::cv;
my %client = ();
sub handle_request {
my $thread_id = shift;
my $thread;
foreach my $thr (@pool) {
if ($thr->id == $thread_id) { $thread = $thr; }
}
my $self = $thread->client;
my $client_key = $thread->clt_key;
my $dir_list = $thread->args;
if ($thread->client != '') {
say "Directories read: " . $dir_list . "\n";
my @clients = keys %client;
for my $key (grep {$_ ne $client_key} @clients) {
my $response = print_dirs($dir_list);
$client{$key}->push_write("$response");
$self->push_shutdown;
delete $client{$client_key};
delete $client{$self};
}
}
$thread->is_busy(0);
Coro::cede();
}
# threads creation
for my $i (0..$threads_num) {
my $coro = new Coro(\&handle_request, $thread_id);
my $thread = clt_thread->new(id => $thread_id, thread => $coro, is_busy => 0, args => '', client => '', clt_key => '');
push @pool, $thread;
$thread_id = $thread_id+1;
}
# tcp server creation - main part
tcp_server '127.0.0.1', 8015, sub {
my ($fh, $host, $port) = @_;
my $client_key = "$host:$port";
my $hdl = AnyEvent::Handle->new(
fh => $fh,
poll => 'r',
on_read => sub {
my ($self) = @_;
foreach my $thr (@pool) {
if (!($thr->is_busy)) {
$thr->client($self);
$thr->args($self->rbuf);
$thr->clt_key($client_key);
$thr->is_busy(1);
$thr->thread->ready();
return;
}
}
},
on_error => sub {
say "Something went wrong: $!\n";
},
);
$client{$client_key} = $hdl;
$client{$hdl} = $hdl;
};
$cv->recv;
I have already tried using infinite loop inside handle_request, but this way everything stops working at all. Do you have any ideas how to fix that? I suppose using Coro::AnyEvent to integrate coroutines into event loop might be solution. Can it be helpful in my case?
Thans for your help.
Upvotes: 1
Views: 335
Reputation: 386216
The thread exits when handle_request
exits, so you want to wrap the body of handle_request
in an infinite loop.
You also want to use Coro::schedule;
instead of Coro::cede;
to wait for ->ready
to be called again before continuing.
That first loop in handle_request
can be reduced to my $thread = $pool[$thread_id];
.
Untested fix:
sub handle_request {
my ($thread_id) = @_;
my $thread = $pool[$thread_id];
while (1) {
my $self = $thread->client;
my $client_key = $thread->clt_key;
my $dir_list = $thread->args;
...
$thread->is_busy(0);
Coro::schedule();
}
}
That said, the following is the approach I'd use:
use Coro;
use Coro::Channel;
use constant NUM_WORKERS => 16;
sub worker {
my ($job) = @_;
my $self = $job->client;
my $client_key = $job->clt_key;
my $dir_list = $job->args;
...
}
{
my $q = Coro::Channel->new();
my @threads =
map {
async {
while ( my $job = $q->get() ) {
eval { worker($job); 1 }
or warn $@;
}
}
}
1..NUM_WORKERS;
...
on_read => sub {
my ($self) = @_;
$q->put({
client => $self,
clt_key => $client_key,
args => $self->rbuf,
});
}
...
$cv->recv;
$q->shutdown;
$_->join for @threads;
}
This is the same approach I'd use with real threads (using Thread::Queue instead of Coro::Channel).
Upvotes: 1