Reputation: 21
i am trying to pass a subroutine from an self written module to threads using the following code.
This is my first time using threads so I'm kinda not familiar with it.
Main Script (shortend)
#!/usr/bin/perl -w
use strict;
use threads;
use lib 'PATH TO LIB';
use goldstandard;
my $delete_raw_files = 0;
my $outfolder = /PATH/;
my %folder = goldstandard -> create_folder($outfolder,$delete_raw_files);
&tagging if $tagging == 1;
sub tagging{
my %hash = goldstandard -> tagging_hash(\%folder);
my @threads;
foreach(keys %hash){
if($_ =~ m/mate/){
my $arguments = "goldstandard -> mate_tagging($hash{$_}{raw},$hash{$_}{temp},$hash{$_}{tagged},$mate_anna,$mate_model)";
push(@threads,$arguments);
}
if($_ =~ m/morpheus/){
my $arguments = "goldstandard -> morpheus_tagging($hash{$_}{source},$hash{$_}{tagged},$morpheus_stemlib,$morpheus_cruncher)";
push(@threads,$arguments)
}
}
foreach(@threads){
my $thread = threads->create($_);
$thread ->join();
}
}
Module
package goldstandard;
use strict;
use warnings;
sub mate_tagging{
my $Referenz = shift;
my $input = shift;
my $output_temp_dir = shift;
my $output_mate_human = shift;
my $anna = shift;
my $model = shift;
opendir(DIR,"$input");
my @dir = readdir(DIR);
my $anzahl = @dir;
foreach(@dir){
unless($_ =~ m/^\./){
my $name = $_;
my $path = $input . $_;
my $out_temp = $output_temp_dir . $name;
my $out_mate_human_final = $output_mate_human . $name;
qx(java -Xmx10G -classpath $anna is2.tag.Tagger -model $model -test $path -out $out_temp);
open(OUT, "> $out_mate_human_final");
open(TEMP, "< $out_temp");
my $output_text;
while(<TEMP>){
unless($_ =~ m/^\s+$/){
if ($_ =~ m/^\d+\t(.*?)\t_\t_\t_\t(.*?)\t_\t/) {
my $tags = $2;
my $words = $1;
print OUT "$words\t$tags\n";
}
}
}
}
}
}
sub morpheus_tagging{
my $Referenz = shift;
my $input = shift;
my $output = shift;
my $stemlib = shift;
my $cruncher = shift;
opendir(DIR,"$input");
my @dir = readdir(DIR);
foreach(@dir){
unless($_ =~ m/^\./){
my $name = $_;
my $path = $input . $_;
my $out = $output . $name;
qx(env MORPHLIB='$stemlib' '$cruncher' < '$path' > '$out');
}
}
}
1;
Executing this code gets me
Thread 1 terminated abnormally: Undefined subroutine &main::goldstandard -> morpheus_tagging(...) called at ... line 43.
I guess eather the way I am calling the treads or the way I am providing the arguments are wrong. I Hope some can help me with that? I Also found something on safe and unsafe modules bum I'm not sure is this is realy the problem.
I guess eather the way I am calling the treads or the way I am providing the arguments are wrong. I Hope some can help me with that? I Also found something on safe and unsafe modules bum I'm not sure is this is realy the problem.Thanks in advance
Upvotes: 1
Views: 114
Reputation: 385916
You must pass the name of a sub or a reference to a sub, plus arguments, to threads->create
. So you need something like
my $method_ref = $invoker->can($method_name);
threads->create($method_ref, $invoker, @args);
That said, passing arguments to threads->create
has issues that can be avoided by using a closure.
threads->create(sub { $invoker->$method_name(@args) })
The above can be written more simply as follows:
async { $invoker->$method_name(@args) }
This gets us the following:
sub tagging {
my %hash = goldstandard->tagging_hash(\%folder);
my @jobs;
for (keys %hash) {
if (/mate/) {
push @jobs, [ 'goldstandard', 'mate_tagging',
$hash{$_}{raw},
$hash{$_}{temp},
$hash{$_}{tagged},
$mate_anna,
$mate_model,
];
}
if (/morpheus/) {
push @jobs, [ 'goldstandard', 'morpheus_tagging',
$hash{$_}{source},
$hash{$_}{tagged},
$morpheus_stemlib,
$morpheus_cruncher,
];
}
}
my @threads;
for my $job (@jobs) {
my ($invoker, $method_name, @args) = @$job;
push @threads, async { $invoker->$method_name(@args) };
}
$_->join for @threads;
}
or just
sub tagging {
my %hash = goldstandard->tagging_hash(\%folder);
my @threads;
for (keys %hash) {
if (/mate/) {
push @threads, async {
goldstandard->mate_tagging(
$hash{$_}{raw},
$hash{$_}{temp},
$hash{$_}{tagged},
$mate_anna,
$mate_model,
);
};
}
if (/morpheus/) {
push @threads, async {
goldstandard->morpheus_tagging(
$hash{$_}{source},
$hash{$_}{tagged},
$morpheus_stemlib,
$morpheus_cruncher,
);
};
}
}
$_->join for @threads;
}
Notes that I delayed the calls to join
until after all the threads are created. Your way made it so only one thread would run at a time.
But what we have isn't great. We have no way of limiting how many threads are active at a time, and we (expensively) create many threads instead of reusing them. We can use a worker pool to solve both of these problems.
use constant NUM_WORKERS => 5;
use Thread::Queue 3.01 qw( );
my $q;
sub tagging {
my %hash = goldstandard->tagging_hash(\%folder);
my @threads;
for (keys %hash) {
if (/mate/) {
$q->enqueue(sub {
goldstandard->mate_tagging(
$hash{$_}{raw},
$hash{$_}{temp},
$hash{$_}{tagged},
$mate_anna,
$mate_model,
);
});
}
if (/morpheus/) {
$q->enqueue(sub {
goldstandard->morpheus_tagging(
$hash{$_}{source},
$hash{$_}{tagged},
$morpheus_stemlib,
$morpheus_cruncher,
);
});
}
}
}
{
$q = Thread::Queue->new();
for (1..NUM_WORKERS) {
async {
while ( my $job = $q->dequeue() ) {
$job->();
}
};
}
... call tagging and whatever ...
$q->end();
$_->join() for threads->list();
}
Upvotes: 4