免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
1234下一页
最近访问板块 发新帖
查看: 42905 | 回复: 33
打印 上一主题 下一主题

用多线程和持续连接实现高速WEB请求 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-06-16 01:27 |只看该作者 |倒序浏览
我经常需要提取大量的(1500页以上)网页数据,曾尝试过很多方法,虽然都能实现,但效率都不是太高。

刚开始用LWP::Simple(get)按顺序边下载边提取,这种方法很容易控制,也很可靠,下载中途中断了可以通过检查数据的完整性断点续传,下载的网页数据并不存入本地硬盘,仅存储提取后的少量数据,硬盘操作少,但下载效率很低;

为了加快下载速度,考虑用第三方下载软件先下载网页数据,再用程序从已下载的网页中提取数据。所以开始使用Teleport下载网页数据,Teleport支持多线程下载,速度提高了至少3-5倍。但用Teleport下载大量网页的时候,也会有下载失败的情况,程序本身并不自动检测并重新下载,通常需要手工重复下载一次以检验数据的完整性,这需要额外消耗一些时间。这种方法效率较高,也很稳定,我使用了很长时间;

后来在CU论坛看到仙子发的多线程模型,就将自己的代码改造成多线程下载,效率比Teleport快了不少,但仙子给的多线程模型很难控制,下载过程中经常发呆很久,下载失败率很高(10%左右),下载失败的任务无法再继续通过多线程下载(我没找到方法),不能即时显示下载进度,在下载后期经常假死,处于无限期等待状态,程序不再继续运行,不得不手工关闭。仙子发的多进程模型也尝试过,多线程中的问题,多进程同样存在,而且资源消耗非常大。所以不得不放弃,曾对PERL的多线程和多进程不抱希望;

再后来,Perl China官方QQ群群主莫言给了个终极解决方案,使用LWP::ConnCache建立持续连接,并结合多线程(线程池方式)实现高速WEB数据请求。这种方案非常高效,下载速度是Teleport的3-5倍,而且易于控制,能即时显示下载进度。

现共享给大家,以求共同进步。

(以下代码以请求1000次百度主页为例,下载测试环境为:XP,ActivePerl 5.10.1007,1M电信宽带,测试数据仅供参考)

1.顺序请求,不使用持续连接,程序每请求一次需要与服务器新建一次连接,这会耗费大量时间,平均下载速度约为每秒0.7次;
  1. #!/usr/bin/perl

  2. use strict;
  3. use warnings;
  4. use LWP::UserAgent;
  5. use Benchmark;
  6. my $TT0 = new Benchmark;

  7. my $url = "http://www.baidu.com";
  8. my $request_times = 1000;

  9. print "\n Now begin testing ... \n";
  10. my $lwp = new LWP::UserAgent(agent => 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; CIBA)');

  11. for(1..$request_times) {
  12.         my $request = HTTP::Request->new(GET=>$url);
  13.         $request->header(Accept=>'text/html');
  14.         my $response = $lwp->request($request);
  15.          if ($response->is_success) {
  16.                  print " $_\tOK!\n";
  17.          }
  18.          else {
  19.                  print " $_\tFaild!\n";
  20.                  redo;
  21.          }
  22. }
  23. my $TT1 = new Benchmark;
  24. my $td = Benchmark::timediff($TT1, $TT0);
  25. $td = Benchmark::timestr($td);
  26. my ($sec) = ($td =~ /(\d+).*/);
  27. my $speed = sprintf("%0.1f",$request_times/$sec);
  28. print "\n Time expend: $td\n Average Speed: $speed Times Per Second\n\n Press Enter to close me ... \7";

  29. <STDIN>;
复制代码
2.顺序请求,使用持续连接,向同一服务器多次发送请求仅需建立一次连接,平均下载速度约为每秒7.2次,下载效率有明显提高;
  1. #!/usr/bin/perl

  2. use strict;
  3. use warnings;
  4. use LWP::UserAgent;
  5. use LWP::ConnCache;
  6. use Benchmark;
  7. my $TT0 = new Benchmark;

  8. my $url = "http://www.baidu.com";
  9. my $request_times = 1000;

  10. print "\n Now begin testing ... \n";
  11. my $lwp = new LWP::UserAgent(agent => 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; CIBA)');
  12. my $conncache = new LWP::ConnCache;
  13. $lwp->conn_cache($conncache);

  14. for(1..$request_times) {
  15.         my $request = HTTP::Request->new(GET=>$url);
  16.         $request->header(Accept=>'text/html');
  17.         my $response = $lwp->request($request);
  18.          if ($response->is_success) {
  19.                  print " $_\tOK!\n";
  20.          }
  21.          else {
  22.                  print " $_\tFaild!\n";
  23.                  redo;
  24.          }
  25. }
  26. my $TT1 = new Benchmark;
  27. my $td = Benchmark::timediff($TT1, $TT0);
  28. $td = Benchmark::timestr($td);
  29. my ($sec) = ($td =~ /(\d+).*/);
  30. my $speed = sprintf("%0.1f",$request_times/$sec);
  31. print "\n Time expend: $td\n Average Speed: $speed Times Per Second\n\n Press Enter to close me ... \7";

  32. <STDIN>;
复制代码
3.使用多线程,并结合持续连接。平均下载速度约为每秒17次,是令人兴奋的质的飞跃。
  1. #!/usr/bin/perl

  2. use strict;
  3. use warnings;
  4. use threads;
  5. use threads::shared;
  6. use Thread::Queue;
  7. use LWP::UserAgent;
  8. use LWP::ConnCache;
  9. use Benchmark;
  10. my $TT0 = new Benchmark;

  11. my $url = "http://www.baidu.com";
  12. my $request_times = 1000;

  13. print "\n Now begin testing ... \n";
  14. my $lwp = new LWP::UserAgent(agent => 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; CIBA)');
  15. my $conncache = new LWP::ConnCache;
  16. $lwp->conn_cache($conncache);

  17. my $data_queue = new Thread::Queue;
  18. my $result_queue = new Thread::Queue;
  19. my $processing_count :shared = 0;
  20. my $MAX_THREADS = 10;
  21. my $num = 1;

  22. for (my $n = 0; $n < $MAX_THREADS; $n++)
  23. {
  24.         threads->create(\&thread_io);
  25. }

  26. foreach my $data(1..$request_times)
  27. {
  28.         if ($data_queue ->pending() > $MAX_THREADS * 2)
  29.         {
  30.                 select(undef, undef, undef, 0.02);
  31.                 redo;
  32.         }

  33.         $data_queue->enqueue($data);
  34.         if ($result_queue->pending() > 0)
  35.         {
  36.                 while (my $result = $result_queue->dequeue_nb())
  37.                 {
  38.                         if($result) { print " $num\tOK!\n"; }
  39.                         else { print " $num\tFailed!\n"; }
  40.                          $num++;
  41.                 }
  42.         }
  43. }

  44. while ($processing_count > 0 or $data_queue->pending() > 0 or $result_queue->pending() > 0)
  45. {
  46.          select(undef, undef, undef, 0.02);
  47.         while (my $result = $result_queue->dequeue_nb())
  48.         {
  49.                         if($result) { print " $num\tOK!\n"; }
  50.                         else { print " $num\tFailed!\n"; }
  51.                          $num++;
  52.         }
  53. }

  54. foreach my $thread (threads->list(threads::all))
  55. {
  56.         $thread->detach();
  57. }

  58. my $TT1 = new Benchmark;
  59. my $td = Benchmark::timediff($TT1, $TT0);
  60. $td = Benchmark::timestr($td);
  61. my ($sec) = ($td =~ /(\d+).*/);
  62. my $speed = sprintf("%0.1f",$request_times/$sec);
  63. print "\n Time expend: $td\n Average Speed: $speed Times Per Second\n\n Press Enter to close me ... \7";

  64. <STDIN>;

  65. ##########################################################################################

  66. sub thread_io()
  67. {
  68.         while (my $data = $data_queue->dequeue())
  69.         {
  70.                 {
  71.                         lock $processing_count;
  72.                         ++$processing_count;
  73.                 }

  74.                 my $result = get_html($data);
  75.                 $result_queue->enqueue($result);

  76.                 {
  77.                         lock $processing_count;
  78.                         --$processing_count;
  79.                 }
  80.         }
  81. }

  82. sub get_html {
  83.         my $no = shift;
  84.         my $request = HTTP::Request->new(GET=>$url);
  85.         $request->header(Accept=>'text/html');
  86.         my $response = $lwp->request($request);
  87.          if ($response->is_success) {
  88.                  return(1);
  89.          }
  90.          else {
  91.                 $data_queue->enqueue($no);   #请求失败的任务,可以在此重新加入队列
  92.                  return(0);
  93.          }
  94. }
复制代码
该多线程模型为莫言原创,采用线程池方式,建2个队列,一个负责向线程队列添加任务,另一个负责管理任务的处理结果。请求失败的任务,可以重新加入队列,以保证每个请求的有效性。该模型易于控制,可靠性高,能即时显示任务的处理进度,而且效率高。

注意:LWP::ConnCache不支持LWP::Simple,支持LWP::UserAgent。

在此特别感谢莫言。

论坛徽章:
0
2 [报告]
发表于 2010-06-16 10:07 |只看该作者
不错,收藏并研究中。。。

论坛徽章:
0
3 [报告]
发表于 2010-06-16 15:02 |只看该作者
但仙子给的多线程模型很难控制,下载过程中经常发呆很久

那只是个模型而已的。。
产品级线程/进程应用,不是那几行代码就可以的。

论坛徽章:
0
4 [报告]
发表于 2010-06-16 20:35 |只看该作者
本帖最后由 黑色阳光_cu 于 2010-06-16 22:14 编辑

CORO版本的线程池

  1. #!/bin/env perl

  2. use strict;
  3. use warnings;
  4. use Benchmark;
  5. use Coro;
  6. use Coro::Timer;
  7. use Coro::LWP;
  8. use LWP::UserAgent;
  9. use LWP::ConnCache;

  10. my $MAX_THREADS = 10;
  11. my @data_queue;
  12. my @result_queue;
  13. my $processing_count = 0;

  14. my $TT0 = new Benchmark;

  15. foreach (1 .. 10)
  16. {
  17.         async(\&thread_io);
  18. }

  19. foreach (1 .. 1000)
  20. {
  21.         if ($#data_queue > $MAX_THREADS * 2)
  22.         {
  23.                 Coro::Timer::sleep(0.02);
  24.                 redo;
  25.         }

  26.         push(@data_queue, $_);
  27.         if ($#result_queue > -1)
  28.         {
  29.                 warn $#result_queue;
  30.                 while ($#result_queue > -1)
  31.                 {
  32.                         my $result = shift(@result_queue);
  33.                         print "$result\n";
  34.                 }
  35.         }
  36. }

  37. while ($processing_count > 0 or $#data_queue > -1 or $#result_queue > -1)
  38. {
  39.         while ($#result_queue > -1)
  40.         {
  41.                 my $result = shift(@result_queue);
  42.                 print "$result\n";
  43.         }
  44.        
  45.         Coro::Timer::sleep(0.02);
  46. }

  47. my $TT1 = new Benchmark;
  48. my $td = Benchmark::timediff($TT1, $TT0);
  49. $td = Benchmark::timestr($td);
  50. print "\n Time expend: $td\n\n Press Enter to close me ... \7";
  51. <STDIN>;

  52. ##########################################################################

  53. sub thread_io()
  54. {
  55.         my $lwp = LWP::UserAgent->new();
  56.         my $conncache = new LWP::ConnCache;
  57.         $lwp->conn_cache($conncache);

  58.         while (1)
  59.         {
  60.                 if ($#data_queue == -1)
  61.                 {
  62.                         Coro::Timer::sleep(1);
  63.                         next;
  64.                 }

  65.                 my $data = shift(@data_queue);
  66.                 ++$processing_count;
  67.                 my $r = $lwp->get("http://www.baidu.com/");
  68.                 my $status = $r->status_line();
  69.                 push(@result_queue, "$Coro::current $data $status");
  70.                 --$processing_count;
  71.         }
  72. }

复制代码

论坛徽章:
0
5 [报告]
发表于 2010-06-17 09:19 |只看该作者
恩 学习下

论坛徽章:
0
6 [报告]
发表于 2010-06-17 09:51 |只看该作者
Perl China官方交流群59785891

此莫言是这个群的莫言吗?

论坛徽章:
0
7 [报告]
发表于 2010-06-17 11:25 |只看该作者
Coro直接带 Coro::Semaphore; 线程池 不用自己写
  1. use strict;
  2. use warnings;
  3. use Smart::Comments;

  4. use Coro;
  5. use Coro::LWP;
  6. use Coro::Semaphore;
  7. use LWP::UserAgent;
  8. use LWP::ConnCache;
  9. use constant LIMIT => 200;

  10. use Benchmark;
  11. my $TT0 = new Benchmark;
  12. my $request_times = 1000;
  13. print "\n Now begin testing ... \n";

  14. my $outpath = './data';
  15. my $url = 'http://www.baidu.com';

  16. my $semaphore = Coro::Semaphore->new(LIMIT);
  17. my (@coros,@error);

  18. my $ua    = new LWP::UserAgent;
  19. my $conncache = new LWP::ConnCache;
  20. $ua->conn_cache($conncache);

  21. for my $num (0..999) {
  22.         my $outfile = $outpath.'/'.$num.'.html';
  23.         my $outfile_tmp = $outfile.'.tmp';
  24.     push @coros, async {
  25.                my $guard = $semaphore->guard;
  26.        
  27.                 my $req = HTTP::Request->new(GET => $url);
  28.                 my $res = $ua->request($req);
  29.                                                                                                        
  30.                    if($res->is_success){
  31.                            my $html = $res -> content;
  32.                            open(FF,">",$outfile_tmp);
  33.                            print FF $html;
  34.                            close(FF);
  35.                           
  36.                            if(-e $outfile_tmp){
  37.                                    unlink $outfile;
  38.                                    rename $outfile_tmp,$outfile;
  39.                            }
  40.                            print $num,'ok',$/;
  41.                    }else{
  42.                            push @error,$url;
  43.                    }
  44.         };
  45. }
  46.        
  47. $_ -> join for @coros;
  48.        
  49. my $TT1 = new Benchmark;
  50. my $td = Benchmark::timediff($TT1, $TT0);
  51. $td = Benchmark::timestr($td);
  52. my ($sec) = ($td =~ /(\d+).*/);
  53. my $speed = sprintf("%0.1f",$request_times/$sec);
  54. print "\n Time expend: $td\n Average Speed: $speed Times Per Second\n\n Press Enter to close me ... \7";

复制代码

论坛徽章:
0
8 [报告]
发表于 2010-06-17 11:51 |只看该作者
本帖最后由 iamlimeng 于 2010-06-17 13:06 编辑
Perl China官方交流群59785891

此莫言是这个群的莫言吗?
xuri0419 发表于 2010-06-17 09:51



    正是!

论坛徽章:
0
9 [报告]
发表于 2010-06-17 12:00 |只看该作者
Coro直接带 Coro::Semaphore; 线程池 不用自己写
hitsubunnu 发表于 2010-06-17 11:25





没看明白~~~

论坛徽章:
0
10 [报告]
发表于 2010-06-17 13:43 |只看该作者
本帖最后由 lucash 于 2010-06-18 11:13 编辑

回复 4# 黑色阳光_cu


   
哈哈!.
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP