- 论坛徽章:
- 1
|
本帖最后由 laolun 于 2015-08-30 13:48 编辑
现在在做一个项目,需要采集大量的网页,从其中提取信息。
我打算用Coro来实现,但是用的过程中,会出现假死的情况。不是因为内存或者CPU的原因,这个程序不占CPU或者内存,跟网速有关系。网络情况好的时候,几乎不会出现,反之就经常出现。已经写了一个线程来监控其他线程超时的问题,但是问题依然存在。可能我代码写的有问题,大家帮忙看看。谢谢啦!- use strict;
- use warnings;
- use Coro;
- use Coro::Timer;
- use Coro::LWP;
- use LWP::UserAgent;
- use LWP::ConnCache;
- use HTTP::Cookies;
- use Time::HiRes;
- use JSON;
- use Data::Dumper;
- my @urls=();
- open(my $in, "todo.txt") or die $!;
- while(<$in>){
- chomp $_;
- push(@urls, $_);
- }
- close($in);
- print scalar(@urls)." pages\n";
- my $MAX_THREADS = 10;
- my @data_queue;
- my @result_queue;
- my $processing_count = 0;
- foreach (1 .. 10)
- {
- async(\&thread_io);
- }
- # 监视是否线程超时
- async{
- while(1){
- Coro::Timer::sleep 10;
- my @coros=Coro::State::list;
- #print Dumper(\@coros);
- my @lwp_coro = grep { $_->desc eq "LWP" } @coros;
- warn sprintf "%s lwp coro found.", scalar @lwp_coro;
- for my $coro (@lwp_coro) {
- my $now=time();
- if ($now > $coro->{timeout_at}) {
- $coro->cancel("timeout");
- if(@data_queue){
- async(\&thread_io);
- }
- }
- }
- if(@lwp_coro==0){
- exit;
- }
- }
- };
- for (@urls)
- {
- if ($#data_queue > $MAX_THREADS * 2)
- {
- Coro::Timer::sleep(0.02);
- redo;
- }
- push(@data_queue, $_);
- if ($#result_queue > -1)
- {
- # warn $#result_queue;
- while ($#result_queue > -1)
- {
- my $result = shift(@result_queue);
- # print "$result\n";
- append("content.txt", $result."\n");
- }
- }
- }
- while ($processing_count > 0 or $#data_queue > -1 or $#result_queue > -1)
- {
- while ($#result_queue > -1)
- {
- my $result = shift(@result_queue);
- # print "$result\n";
- append("content.txt", $result."\n");
- }
-
- Coro::Timer::sleep(0.02);
- }
- print "done";
- <STDIN>;
- ##########################################################################
- sub thread_io{
- my $lwp = LWP::UserAgent->new();
- $lwp->agent("Mozilla/5.0 (Windows NT 6.1; rv:36.0) Gecko/20100101 Firefox/36.0");#设置浏览器属性
- my $conncache = new LWP::ConnCache;
- $lwp->conn_cache($conncache);
- my $cookie_jar = HTTP::Cookies->new(
- file => "cookies",
- autosave => 1,
- );
- $lwp->cookie_jar($cookie_jar);
- $Coro::current->desc("LWP");
- $lwp->timeout(60);
- coro_timeout(60);
- while (1)
- {
- if ($#data_queue == -1)
- {
- Coro::Timer::sleep(1);
- next;
- }
- my $data = shift(@data_queue);
- print $data."\t";
- ++$processing_count;
- my $r = $lwp->get($data);
- my $status = $r->status_line();
- my $c=filter($r->content());
- # push(@result_queue, "$Coro::current $data $status");
- if($r->is_success){
- print "success\n";
- push(@result_queue, "$data\t".dosth($c));
- }else{
- print "failed\n";
- append("failed.txt", $data."\n");
- }
- --$processing_count;
- }
- }
- sub coro_timeout {
- my $timeout = shift;
- my $coro = $Coro::current;
- $coro->{timeout_at} = Time::HiRes::time() + $timeout;
- $coro->on_destroy(sub {
- my $message = shift;
- warn sprintf "coro:%s cancel because %s", $coro->desc, $message;
- });
- }
- sub dosth{
- my ($c)=@_;
- # 很简单的字符串处理,省略掉了。
- return 1;
- }
- sub append{
- my($fn,$c)=@_;
- open(my $out, ">>$fn") or die $!;
- print $out $c;
- close($out);
- }
- sub filter{
- my ($str)=@_;
- $str=~s/^\s+//;
- $str=~s/\s+$//;
- $str=~s/\s+/ /igs;
- return $str;
- }
复制代码 |
|