Lorem Ipsum
Lorem Ipsum

Reputation: 21

Perl hand Module to threads

i am trying to pass a subroutine from an self written module to threads using the following code.

This is my first time using threads so I'm kinda not familiar with it.

Main Script (shortend)

#!/usr/bin/perl -w
use strict;
use threads;

use lib 'PATH TO LIB';
use goldstandard;

my $delete_raw_files = 0;
my $outfolder = /PATH/;
my %folder = goldstandard -> create_folder($outfolder,$delete_raw_files);

&tagging if $tagging == 1;

sub tagging{
    my %hash = goldstandard -> tagging_hash(\%folder);
    my @threads;
    foreach(keys %hash){
        if($_ =~ m/mate/){
            my $arguments = "goldstandard -> mate_tagging($hash{$_}{raw},$hash{$_}{temp},$hash{$_}{tagged},$mate_anna,$mate_model)";
            push(@threads,$arguments);
        }
        if($_ =~ m/morpheus/){
            my $arguments = "goldstandard -> morpheus_tagging($hash{$_}{source},$hash{$_}{tagged},$morpheus_stemlib,$morpheus_cruncher)";
            push(@threads,$arguments)
        }
    }
    foreach(@threads){
        my $thread = threads->create($_);
        $thread ->join();
    }
}

Module

package goldstandard;
use strict;
use warnings;
sub mate_tagging{
    my $Referenz = shift;
    my $input = shift;
    my $output_temp_dir = shift;
    my $output_mate_human = shift;
    my $anna = shift;
    my $model = shift;
    opendir(DIR,"$input");                                                  
    my @dir = readdir(DIR);
    my $anzahl = @dir;
    foreach(@dir){
        unless($_ =~ m/^\./){
            my $name = $_;
            my $path = $input . $_;
            my $out_temp = $output_temp_dir . $name;
            my $out_mate_human_final = $output_mate_human . $name;
            qx(java -Xmx10G -classpath $anna is2.tag.Tagger -model $model -test $path -out $out_temp);
            open(OUT, "> $out_mate_human_final");
            open(TEMP, "< $out_temp");
            my $output_text;
            while(<TEMP>){
                unless($_ =~ m/^\s+$/){
                    if ($_ =~ m/^\d+\t(.*?)\t_\t_\t_\t(.*?)\t_\t/) {
                        my $tags = $2;
                        my $words = $1;
                        print OUT "$words\t$tags\n";
                    }
                }
            }
        }
    }
}

sub morpheus_tagging{
    my $Referenz = shift;
    my $input = shift;
    my $output = shift;
    my $stemlib = shift;
    my $cruncher = shift;
    opendir(DIR,"$input");                                                  
    my @dir = readdir(DIR);
    foreach(@dir){
        unless($_ =~ m/^\./){
            my $name = $_;
            my $path = $input . $_;
            my $out = $output . $name;
            qx(env MORPHLIB='$stemlib' '$cruncher' < '$path' > '$out');
        }
    }
}

1;

Executing this code gets me

Thread 1 terminated abnormally: Undefined subroutine &main::goldstandard -> morpheus_tagging(...) called at ... line 43.

I guess eather the way I am calling the treads or the way I am providing the arguments are wrong. I Hope some can help me with that? I Also found something on safe and unsafe modules bum I'm not sure is this is realy the problem.

I guess eather the way I am calling the treads or the way I am providing the arguments are wrong. I Hope some can help me with that? I Also found something on safe and unsafe modules bum I'm not sure is this is realy the problem.Thanks in advance

Upvotes: 1

Views: 114

Answers (1)

ikegami
ikegami

Reputation: 385916

You must pass the name of a sub or a reference to a sub, plus arguments, to threads->create. So you need something like

my $method_ref = $invoker->can($method_name);
threads->create($method_ref, $invoker, @args);

That said, passing arguments to threads->create has issues that can be avoided by using a closure.

threads->create(sub { $invoker->$method_name(@args) })

The above can be written more simply as follows:

async { $invoker->$method_name(@args) }

This gets us the following:

sub tagging {
    my %hash = goldstandard->tagging_hash(\%folder);

    my @jobs;
    for (keys %hash) {
        if (/mate/) {
            push @jobs, [ 'goldstandard', 'mate_tagging',
                $hash{$_}{raw},
                $hash{$_}{temp},
                $hash{$_}{tagged},
                $mate_anna,
                $mate_model,
            ];
        }

        if (/morpheus/) {
            push @jobs, [ 'goldstandard', 'morpheus_tagging',
                $hash{$_}{source},
                $hash{$_}{tagged},
                $morpheus_stemlib,
                $morpheus_cruncher,
            ];
        }
    }

    my @threads;
    for my $job (@jobs) {
        my ($invoker, $method_name, @args) = @$job;
        push @threads, async { $invoker->$method_name(@args) };
    }

    $_->join for @threads;
}

or just

sub tagging {
    my %hash = goldstandard->tagging_hash(\%folder);

    my @threads;
    for (keys %hash) {
        if (/mate/) {
            push @threads, async {
                goldstandard->mate_tagging(
                    $hash{$_}{raw},
                    $hash{$_}{temp},
                    $hash{$_}{tagged},
                    $mate_anna,
                    $mate_model,
                );
            };
        }

        if (/morpheus/) {
            push @threads, async {
               goldstandard->morpheus_tagging(
                   $hash{$_}{source},
                   $hash{$_}{tagged},
                   $morpheus_stemlib,
                   $morpheus_cruncher,
               );
            };
        }
    }

    $_->join for @threads;
}

Notes that I delayed the calls to join until after all the threads are created. Your way made it so only one thread would run at a time.

But what we have isn't great. We have no way of limiting how many threads are active at a time, and we (expensively) create many threads instead of reusing them. We can use a worker pool to solve both of these problems.

use constant NUM_WORKERS => 5;

use Thread::Queue 3.01 qw( );

my $q;

sub tagging {
    my %hash = goldstandard->tagging_hash(\%folder);

    my @threads;
    for (keys %hash) {
        if (/mate/) {
            $q->enqueue(sub {
                goldstandard->mate_tagging(
                    $hash{$_}{raw},
                    $hash{$_}{temp},
                    $hash{$_}{tagged},
                    $mate_anna,
                    $mate_model,
                );
            });
        }

        if (/morpheus/) {
            $q->enqueue(sub {
               goldstandard->morpheus_tagging(
                   $hash{$_}{source},
                   $hash{$_}{tagged},
                   $morpheus_stemlib,
                   $morpheus_cruncher,
               );
            });
        }
    }
}

{
    $q = Thread::Queue->new();

    for (1..NUM_WORKERS) {
        async {
            while ( my $job = $q->dequeue() ) {
               $job->();
            }
        };
    }

    ... call tagging and whatever ...

    $q->end();
    $_->join() for threads->list();
}

Upvotes: 4

Related Questions