keety
keety

Reputation: 17441

problems copying shared hash in perl threads

I encountered what I feel is strange behavior of shared hash in perl and needed some help understanding it.

The actual problem is in a far larger code-base and I have tried reducing it to smaller reproducible script.

So essentially the problem I'm facing is I have a shared variable that looks something on these lines :

 my %headers :shared= map { lc($_) => $custom_headers->{$_} }  keys %{$custom_headers};   
 my %task1_request :shared; 
 $task1_request{count} = $count;
 $task1_request{header} = \%headers if(keys %headers);

i.e I end up passing a reference to a shared variable headers to two separate threads

Each of these threads performs "read-only" operations on the reference to hash "headers".

However it looks like while passing a copy of the shared hash to a function in the thread , as shown in example below :

iterate_header($request->{count},%{$request->{header}});

sub iterate_header
{
    my $count = shift;
    my $current_count = scalar(@_);
    if($count != $current_count) {
      print STDERR "Test failed Expected: $count, Actual : $current_count \n";
    }
    else {
      print STDERR "Test passed\n" ;
    }
}

results in copied hash being corrupt i.e The @_ in the iterate_header is corrupt .

It looks to me something on the lines of the iterator being global for a shared hash as a result the copy is not thread safe. However the above is just a reckless assumption on my part and I hope someone can help clarify as to why copying a shared hash is causing this seemingly strange behavior and if this is expected ?

The reproducer script is below :

use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Queue;

#should run test_count * 2 times
sub iterate_header
{
    my $count = shift;
    my $current_count = scalar(@_);
    if($count != $current_count) {
      print STDERR "Test failed Expected: $count, Actual : $current_count \n";
    }
    else {
      print STDERR "Test passed\n" ;
    }
}

sub request_loop {
    my ($request_queue) = @_;

    # wait for the next reuest...
    while (defined(my $request = $request_queue->dequeue())) {
        my %result :shared;
        if(exists($request->{header})) {
            iterate_header($request->{count},%{$request->{header}});
        }
        last if(exists($request->{exit}));
        $result{is_success} = "200";
    }
}

# Main program
# create thread queues
my $task1_request_queue = Thread::Queue->new();   
my $task2_request_queue = Thread::Queue->new();    

# start worker threads
my $task1_worker = threads->create(\&request_loop, $task1_request_queue);
my $task2_worker = threads->create(\&request_loop, $task2_request_queue);

# a high number to ensure tests fail
 my $test_count = 100; 
 my $custom_headers = {
        "key" => "558193F28878E5FE",
        "username" => "Mastodon",
        "real_username" => "Mastodon",
        "type" => "EMPLOYEE",
        "expiration" => "1434556278",
        "env" => "save it",
        "for" => "some ip",
        "long-string" => "This islong string",
        "state" => "internal",
        "account" => "home",
        "original_account" => "home",
        "key" => "MCwCFAPOE74uvXso5alKytqjlfpdqeY4AhRpDeIMLCAk3ciBcyDXLdnyZjC/7Q==",
        "charset" => "iso-8859-1,*,utf-8",
        "agent" => "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Workstation/2013.9.213.116 Safari/535.19",
        "accept" => "application/json, text/plain, */*",
        "encoding" => "gzip,deflate",
        "language" => "en-us,en",
        "cookie" => "TS01375c99=012e7f4fa1e82941689f22669e2e6403ce1c75f9f8c7cb86de86c19a887f61a1109c6e2aae",
        "created" => "1434555378",

    };


my @data = %{$custom_headers};
my $count = scalar(@data);
print STDERR  "Expected Count for all tests:$count\n";
for(my $i = 0;$i < 2; $i++) {
  my %headers :shared= map { lc($_) => $custom_headers->{$_} }  keys %{$custom_headers};   
  #add to task1 q
    {    

        my %task1_request :shared; 
        $task1_request{count} = $count;

        $task1_request{header} = \%headers if(keys %headers);

        $task1_request_queue->enqueue(\%task1_request);
    }

    # add to task2 q
    {
        my %task2_request :shared; 
        $task2_request{count} = $count;

        $task2_request{header} = \%headers if(keys %headers);
        $task2_request_queue->enqueue(\%task2_request);
    }
}

my %end_request :shared = (exit => 1);
$task1_request_queue->enqueue(\%end_request);
$task2_request_queue->enqueue(\%end_request);

$task1_worker->join();
$task2_worker->join();
print "testing done\n";

Example output of a test runs:

[]$ perl thread_shared_issue.pl
Expected Count for all tests:36
Test passed
Test passed
Test passed
Test passed
testing done
[]$ perl thread_shared_issue.pl
Expected Count for all tests:36
Test failed Expected: 36, Actual : 16
Test failed Expected: 36, Actual : 60
Test failed Expected: 36, Actual : 18
Test failed Expected: 36, Actual : 56
testing done

Perl Version Tested with

perl -version

This is perl 5, version 12, subversion 5 (v5.12.5) built for x86_64-linux-thread-multi

Upvotes: 1

Views: 152

Answers (1)

ikegami
ikegami

Reputation: 385915

Two threads are iterating over the same hash at the same time, so they are both changing its iterator. You need to make sure that no more than one thread uses the hash's iterator at a time.

I'd remove all those :shared and use Thread::Queue::Any.

Upvotes: 2

Related Questions