Jeya Kumar
Jeya Kumar

Reputation: 1102

Perl parallel crawler multithreading

I have a multi threaded Perl crawler which is working fine if I declare URLs in array.How ever if I read the URLs from DB I am getting "segmentation failure" error.Please help me to fix this issue.Thanks

Direct URL declaration

use 5.012; use warnings;
use threads;
use Thread::Queue;
use LWP::UserAgent;

use constant THREADS => 10;

my $queue = Thread::Queue->new();
my @URLs =qw(http://www.example.com
http://www.example.com1
http://www.example.com2 );

print @URLs;
my @threads;

for (1..THREADS) {
    push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(5); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            say "$task --> $response";
        }
    });
}

$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;

Trying to read the URLs from DB

my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";

my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";

$sth->execute();

if ($sth->rows < 0) {
    print "Sorry, no domains found.\n";
}
else {                                                
    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};

        push(my @all,$url);   

        use constant THREADS => 10;
        my $queue = Thread::Queue->new();
        my @URLs=@all;
        my @threads;

        for (1..THREADS) {
            push @threads, threads->create(sub {
                my $ua = LWP::UserAgent->new;
                $ua->timeout(500); # short timeout for easy testing.
                while(my $task = $queue->dequeue) {
                    my $response = eval{ $ua->get($task)->status_line };
                    print  "$task --> $response";
                }
            });
        }

        $queue->enqueue( @URLs);
        $queue->enqueue(undef) for 1..THREADS;
        # ... here work is done
        $_->join foreach @threads;
    }

}  #close db

$sth->finish;
$dbh->disconnect;

Expected o/p

www.example.com-->200 ok

www.example.com1-->200 ok

Current o/p

Segmentation error

Upvotes: 0

Views: 126

Answers (3)

ikegami
ikegami

Reputation: 385809

Your $sth and $dbh are still around when you create the thread, creating a copy of them, which is a no-no.

newly created threads must make their own connections to the database. Handles can't be shared across threads.

Better scoping of your variables should avoid the problem.

use strict;
use warnings;
use threads;
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS => 10;

sub worker {
   my ($ua, $url) = @_;
   ...
}

{
   my $q = Thread::Queue->new();

   for (1..NUM_WORKERS) {
      async {
         my $ua = LWP::UserAgent->new();
         while ( my $url = $q->dequeue() ) {
            eval { worker($ua, $url); 1 }
               or warn $@;
         }
      };
   }

   {
      my $dbh = DBI->connect(..., { RaiseError => 1 });
      my $sth = $dbh->prepare('SELECT ...');
      $sth->execute();
      while ( my $row = $sth->fetchrow_hashref() ) {
         $q->enqueue($row->{url});
      }
   }

   $q->end();
   $_->join for threads->list;
}

Upvotes: 2

Sobrique
Sobrique

Reputation: 53478

Segfaults are very rare as a result of perl code. They're memory related, and usually mean a problem in external binaries. (I'd be betting on DBI here)

Threads in particular have a lot of legacy issues - they're getting better in newer versions of perl though. I would strongly recommend that you consider upgrading to a recent version of perl if you haven't already. I know that isn't always an option, but it's a good idea.

It's really hard to second guess your problem, as I don't have your DB, so I can't recreate it.

I would suggest that generally there's a few things you can do to keep threads 'clean' - the way your code works, is the DB handles are in scope within the threads. I would avoid doing that. Declare the thread sub right at the top, with as narrow a scope as possible.

I will note though:

push ( my @all, $url ); 

probably doesn't do what you think it does!

But yes, taking your code I'd put it like this:

#!/usr/bin/perl
use strict;
use warnings;

use threads;
use Thread::Queue;
use LWP;

my $num_threads = 10;

my $work_q = Thread::Queue->new();

sub worker {
    my $ua = LWP::UserAgent->new;
    $ua->timeout(500);    # short timeout for easy testing.
    while ( my $task = $work_q->dequeue ) {
        my $response = eval { $ua->get($task)->status_line };
        print "$task --> $response";
    }
}


## fetch_list

sub fetch_url_list {
    my $dbh = DBI->connect( "DBI:mysql:$database;host=$server",
        $username, $password )    # Get the rows from database
        || die "Could not connect to database: $DBI::errstr";

    my $sth =
        $dbh->prepare( 'select cname,url,xpath,region from competitors'
        )                         #query to select required fields
        || die "$DBI::errstr";

    $sth->execute();


    if ( $sth->rows < 0 ) {
        print "Sorry, no domains found.\n";
    }
    else {
        while ( my $results = $sth->fetchrow_hashref ) {
            my $competitor = $results->{cname};
            my $url        = $results->{url};
            my $xpath      = $results->{xpath};
            my $region     = $results->{region};

            $work_q -> enqueue ( $url );
        }
    }
    $sth->finish;
    $dbh->disconnect;
}

for ( 1 .. $num_threads ) {
    threads->create( \&worker );
}

fetch_url_list();
$work_q->end;

foreach my $thr ( threads->list() ) {
    $thr->join();
}

This way - none of your threads have the DB stuff 'in scope', and the DB doesn't have thread stuff in scope. That reduces odds of 'pollution' causing you problems. In particular - threads when they start 'copy' everything in the current scope, which can do really wierd things when they're objects. (e.g. such as the DB handles)

Failing that, I would consider looking at a 'forking' approach. Threads are good at passing data back and forth, but forks are generally more efficient (definitely on Unix based systems) when you don't need to pass data back and forth (and you don't, you're just running a test and printing a result).

Upvotes: 0

nlu
nlu

Reputation: 1963

You should declare @all outside of the while loop, then, when the URLs are pushed, close that loop and go on

my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";

my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";

$sth->execute();

# >> declare your URL-array before starting to fetch
my @URLs;
if ($sth->rows < 0) {
    print "Sorry, no domains found.\n";
}

else {


    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};

        push(@URLs,$url);   
    }

}  

$sth->finish;
$dbh->disconnect;

use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @threads;

for (1..THREADS) {
        push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(500); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            print  "$task --> $response";
        }
    });
}

$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;

Upvotes: 0

Related Questions