免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 19883 | 回复: 9
打印 上一主题 下一主题

POE学习笔记 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2007-03-13 17:53 |显示全部楼层 |倒序浏览
在程序需要同时处理大量连接的情况下,比如服务器程序、spider程序等,一般可以采用多进程、多线程和非阻塞IO三种方式。我自己编程只喜欢用非阻塞IO。在C下面有libevent库可以用,相比较POE是一款高端产品,刚开始有一点摸不着边际,熟悉之后感觉还是很贴心的。
    POE主要分以下几个组件Kernel、Session、Wheel、Filter、Driver,还有更高级的Component组件,不过基本上是前面几种组件的组合。Kernel是POE核心,内部实现了IO读写信号回调等处理,简单应用程序与Kernel交互并不多。Session是一个处理线程,比如一个服务器程序每一个客户端连接就应该对应于一个Session,同理Spider程序中对于每一个web服务器的连接也应该对应于一个Session,一个应用程序可以有很多Session。Wheel、Filter、Driver是对底层IO的封装。
    来看一个简单的客户端的例子:
  1. use warnings;
  2. use strict;

  3. use POE;
  4. use POE::Wheel::SocketFactory;
  5. use POE::Wheel::ReadWrite;

  6. POE::Session->create
  7.         ( inline_states =>
  8.                 { _start => \&start,
  9.                   connected => \&connected,
  10.                   flushed => \&flushed,
  11.                 }
  12.         );

  13. POE::Kernel->run;       
  14.        
  15. sub start {
  16.         print "_start\n";
  17.         my $wheel = POE::Wheel::SocketFactory->new
  18.                 ( RemoteAddress => 'localhost',
  19.                   RemotePort => 8000,
  20.                   SuccessEvent => "connected",
  21.                   FailureEvent => "_stop" ,
  22.                 );
  23.         $_[HEAP]->{wheel} = $wheel;
  24. }

  25. sub connected {
  26.         print "connected\n";
  27.         my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
  28.         my $wheel = POE::Wheel::ReadWrite->new
  29.                 ( Handle => $socket,
  30.                FlushedEvent => 'flushed',
  31.                 );
  32.         $heap->{wheel} = $wheel;
  33.         $wheel->put("hello server");
  34. }

  35. sub flushed {
  36.         print "flushed\n";
  37.         delete $_[HEAP]->{wheel};
  38. }

复制代码

    打开两个终端,在其中一个输入nc -l -p 8000,另一个终端中执行上面的程序,可以看到在nc终端中输出了hello server,在客户端终端中输出了_started、connected、flushed三行信息。
        这个程序基本上可以分为三部分
        1) POE::Session->create,创建一个Session。
        2) POE::Kernel->run, 启动框架消息分发。
        3) sub start ... 定义各种状态的回调函数。
在创建一个Session时,最重要的参数是inline_states,指定各种状态的回调函数。其中以_字符开始的状态是POE系统定义状态,其他是应用程序自定义状态。POE启动后就会自动将系统中每一个Session的状态设为_start,因此_start对应的处理函数就会被调用,因此下面的代码:
  1. use POE;

  2. POE::Session->create
  3.         ( inline_states =>
  4.                 { _start => sub { print "hello world\n"; }
  5.                 }
  6.         );
  7.        
  8. POE::Kernel->run;
复制代码

会直接打印hello world后退出。
    接着看上面的例子中的_start状态的处理函数,其中的代码
      my $wheel = POE::Wheel::SocketFactory->new
   创建了一个Wheel::SocketFactory,这个Wheel会根据你指定的参数去连接远程的服务,在连接完成后,自动触发SuccessEvent参数指定的状态(上面的程序是connected状态)。
    在connected状态的处理函数中,
      my $wheel = POE::Wheel::ReadWrite->new
    创建了一个Wheel::ReadWrite,在构造函数中你可以指定可读、出错和所有数据已经发送三种情况的对应状态,然后你就可以在对应状态的处理函数中处理各种情况了。
    上面的程序通过指定FlushedEvent,标明了当所有数据被发送后这个Session会被触发到flushed状态。
   (简单总结一下,对应Wheel的参数一般是当某种情况发生时,所属的Session的应该被置为什么状态,而Session的参数主要是各种状态的处理函数。)
    在各个回调函数中,你可以通过@_[KERNEL]得到系统Kernel对象,通过@_[HEAP]得到Session相关数据对象(比如在服务器程序每个Session中就会保留这个客户端的相关信息,地址用户名等),其他不同的处理函数得到的额外参数个数不同,分别可以通过@_[ARG0]、@_[ARG1]、@_[ARG2]得到。比如connected的处理函数就通过@_[ARG0]得到了SocketFactory创建的socket对象。
    下面这一据代码:
      $heap->{wheel} = $wheel;
    有着特殊的意义,当一个Wheel被创建后,应用程序必须把它保存在某处,否则当离开作用域对象被注销后,所有的功能都无法实现了。保存在Session相关数据HEAP中是一个非常自然的选择。类似于flushed处理函数中的代码:
      delete $_[HEAP]->{wheel};
    如果不执行这一句,Wheel就永远存在,程序也就无法退出了。
    下面给出一个简单POE应用程序的框架代码:
  1. use warnings;
  2. use strict;

  3. use POE;
  4. use POE::Wheel::Somewheel;

  5. POE::Session->create
  6.         ( inline_states =>
  7.                 { _start => sub {
  8.                         # do something initial
  9.                         # new some Wheel
  10.                         # put the wheel in HEAP
  11.                   },
  12.                   state_1 => sub {
  13.                           # handle state 1
  14.                           # new some other Wheel
  15.                   },
  16.                   state_2 => sub {
  17.                     # handle state 2
  18.                   },
  19.                 }
  20.         );

  21. POE::Kernel->run;
复制代码

论坛徽章:
0
2 [报告]
发表于 2007-03-14 13:22 |显示全部楼层
Wheel:数据传送带

对于应用程序而言,输入输出是一个非常重要而且耗时的部分。通过使用Wheel,应用程序可以方便地监控IO事件并简化对输入输出操作的编写。对于Socket通讯程序来说,ListenAccept、SocketFactory、ReadWrite三种Wheel分别对应监听端口、建立连接和传送数据三种Socket操作。
1. ListenAccept
功用:处理一个监听端口的连接事件。
事件参数:AcceptEvent,其值在有客户连接时被触发。新建立的socket通过ARG0传送给处理函数。
2. SocketFactory
功用:建立到远程的连接。
事件参数:SuccessEvent,其值在连接完成时被触发。新建立的socket通过ARG0传送给处理函数。
3. ReadWrite
功用:读写数据。
事件参数:InputEvent,其值在当有数据到达时被触发。数据通过ARG0传送给处理函数。
        FlushedEvent,其值在当所有缓冲的数据被发送出去后被触发。你可以在这个事件的处理函数中发送新数据、关闭连接等。

下面通过一个具体的聊天室server/client程序来说明Wheel的使用。客户端接收从服务端传送过来的数据,同时监控标准输入,如果有用户输入则把内容传送给服务端;服务端监听新的连接,并把每一个客户端传送过来的数据广播到所有的客户端。客户端这儿使用了一个Wheel ReadLine,它可以监控终端输入。
chats服务端
  1. use warnings;
  2. use strict;

  3. use IO::Socket;
  4. use POE qw /Wheel::ListenAccept Wheel::ReadWrite/;

  5. # 创建监听Socket及处理Session
  6. POE::Session->create
  7.         ( inline_states =>
  8.                 { _start => \&start_server,
  9.                   new_connected => \&new_connected,
  10.                   client_input => \&client_input,
  11.                 }
  12.         );

  13. POE::Kernel->run;       

  14. sub start_server {
  15.         my ($kernel, $heap) = @_[KERNEL, HEAP];
  16.         my $server = IO::Socket::INET->new
  17.                 ( LocalPort => 8000,
  18.           Listen => 16,
  19.           Reuse  => "yes",
  20.         ) or die "can't make server socket: $@\n";
  21.         
  22.         $heap->{server} = POE::Wheel::ListenAccept->new
  23.                 ( Handle => $server,
  24.                   AcceptEvent => 'new_connected',
  25.                 );
  26. }

  27. sub new_connected {
  28.         my ($heap, $client) = @_[HEAP, ARG0];
  29.         my $wheel = POE::Wheel::ReadWrite->new
  30.                 ( Handle => $client,
  31.                   InputEvent => 'client_input',
  32.                 );
  33.         # 系统中每个wheel的ID是唯一的
  34.         $heap->{client}->{ $wheel->ID } = $wheel;
  35. }

  36. sub client_input {
  37.         my ($heap, $input, $wid) = @_[HEAP, ARG0, ARG1];
  38.         # 广播数据。如果愿意,可以屏蔽掉$wid,即发送消息的客户端
  39.         map { $heap->{client}->{$_}->put( $input ) } keys %{$heap->{client}};
  40. }
复制代码

chatc客户端
  1. use warnings;
  2. use strict;

  3. use IO::Socket;
  4. use POE qw /Wheel::SocketFactory Wheel::ReadWrite Wheel::ReadLine/;

  5. POE::Session->create
  6.         ( inline_states =>
  7.                 { _start => \&start_chat,
  8.                   connected => \&connected,
  9.                   connect_fail => \&connect_fail,
  10.                   server_input => \&server_input,
  11.                   user_input => \&user_input,
  12.                 }
  13.         );
  14.        
  15. POE::Kernel->run;

  16. sub start_chat {
  17.         my $wheel = POE::Wheel::SocketFactory->new
  18.                 ( RemoteAddress => 'localhost',
  19.                   RemotePort => 8000,
  20.                   SuccessEvent => "connected",
  21.                   FailureEvent => "connect_fail",
  22.                 );
  23.         $_[HEAP]->{server} = $wheel;
  24. }       

  25. sub connected {
  26.         my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
  27.         my $wheel = POE::Wheel::ReadWrite->new
  28.                 ( Handle => $socket,
  29.                   InputEvent => "server_input",
  30.                   ErrorEvent => "error_happened",
  31.                 );
  32.         $heap->{server} = $wheel;
  33.        
  34.         my $console = POE::Wheel::ReadLine->new
  35.                 ( InputEvent => 'user_input'
  36.                 );
  37.         # 告诉ReadLine监控终端
  38.         $console->get( 'input your message, bye to quit: ');
  39.         $heap->{console} = $console;
  40. }

  41. sub connect_fail {
  42.         delete $_[HEAP]->{server};
  43. }

  44. sub server_input {
  45.         my ($heap, $input) = @_[HEAP, ARG0];
  46.         # 如果使用print "$input\n"会搞乱终端
  47.         $heap->{console}->put( $input );
  48. }

  49. sub user_input {
  50.         my ($heap, $input) = @_[HEAP, ARG0];
  51.         if ($input =~ /(quit)|(exit)|(bye)/i) {
  52.                 delete $heap->{server};
  53.                 delete $heap->{console};
  54.                 return;
  55.         }
  56.         # 发送到服务端
  57.         $heap->{server}->put( $input );
  58.         # 继续监控终端
  59.         $heap->{console}->get( 'input your message, bye to quit: ');
  60. }
复制代码

论坛徽章:
0
3 [报告]
发表于 2007-03-14 13:44 |显示全部楼层
原帖由 flw 于 2007-3-14 13:25 发表
Wheel::ReadLine 在 windows 下好用不?


查了资料好像是不支持,它使用select监控终端。而win32的select只能用在socket上。

论坛徽章:
0
4 [报告]
发表于 2007-03-14 13:54 |显示全部楼层
原帖由 flw 于 2007-3-14 13:53 发表

因此很讨厌,我打算有空了写个 Win32 版的键盘输入轮子。


支持。

论坛徽章:
0
5 [报告]
发表于 2007-03-15 10:18 |显示全部楼层
Filter:对数据进行包装和拆解

凡是通讯都要涉及到通讯协议的制订,在POE框架中通过Filter层完成对数据的包装和拆解。最简单的Filter是Filter::Stream,它什么也不做只是简单传递原始数据,应用程序需要在事件处理函数中对消息格式进行处理。Wheel ReadWrite缺省使用Filter::Line,这是一个行消息协议处理器,凡是发送的数据都会被自动在末尾加上行分隔符,接收数据时则按进行分割。这就是上面的聊天室程序在没有设置任何消息格式却能正常通讯的原因。

我们可以设计自己的Filter,来对数据进行封装,只需要在创建Wheel ReadWrwite时设置InputFilter和OutputFilter参数即可。设计一个Filter,最重要是实现get_one_start、get_one和put三个函数。对于输出Filer,put函数接收一个数组引用的参数,这个数组里面是应用程序发送的数据包队列,需要返回一个数组引用,数组里面是封装好的数据报队列。对于输入Filter,get_one_start接收一个数组引用,数组里面包含未经处理的远程数据包队列,这个函数需要把这个数据包队列保存起来,然后其他组件可以反复调用get_one得到按照通讯协议分割好的数据包。

下面是经过修改后的聊天室客户端程序,里面包含了一个WhoSaidFilter的类。客户端在启动时接收一个命令行参数作为客户端的名字$name,当用户在终端输入消息后,它会经过WhoSaidFilter把消息$message变成"$name said: $message, IMHO\n"发送给服务端。这个通讯协议兼容Line通讯协议(都用行分割符作为数据包的结束标志),因此服务端不需要更换它的InputFilter。
  1. use warnings;
  2. use strict;

  3. use IO::Socket;
  4. use POE qw /Wheel::SocketFactory Wheel::ReadWrite Wheel::ReadLine/;

  5. my $name = shift;
  6. $name = "Anonymous" unless $name;

  7. # 设计自己Filter
  8. @WhoSaidFilter::ISA = qw (POE::Filter);

  9. sub WhoSaidFilter::new {
  10.         my $type = shift;
  11.         my %params = @_;
  12.         my $name = delete $params{Name};
  13.         return bless { name => $name }, $type;
  14. }

  15. # 重载put函数
  16. sub WhoSaidFilter::put {
  17.   my ($self, $bufs) = @_;

  18.   my @raw;
  19.   foreach (@$bufs) {
  20.     push @raw, "$self->{name} Said: $_, IMHO\n";
  21.   }

  22.   \@raw;
  23. }

  24. POE::Session->create
  25.         ( inline_states =>
  26.                 { _start => \&start_chat,
  27.                   connected => \&connected,
  28.                   connect_fail => \&connect_fail,
  29.                   server_input => \&server_input,
  30.                   user_input => \&user_input,
  31.                 }
  32.         );
  33.        
  34. POE::Kernel->run;

  35. sub start_chat {
  36.         my $wheel = POE::Wheel::SocketFactory->new
  37.                 ( RemoteAddress => 'localhost',
  38.                   RemotePort => 8000,
  39.                   SuccessEvent => "connected",
  40.                   FailureEvent => "connect_fail",
  41.                 );
  42.         $_[HEAP]->{server} = $wheel;
  43. }       

  44. sub connected {
  45.         my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
  46.         my $wheel = POE::Wheel::ReadWrite->new
  47.                 ( Handle => $socket,
  48.                   InputEvent => "server_input",
  49.                   ErrorEvent => "error_happened",
  50.                   # 使用输出Filter
  51.                   OutputFilter => WhoSaidFilter->new( Name => $name ),
  52.                 );
  53.         # 下面这一句同时把以前的那个SocketFactory的轮子删掉
  54.         $heap->{server} = $wheel;
  55.        
  56.         my $console = POE::Wheel::ReadLine->new
  57.                 ( InputEvent => 'user_input'
  58.                 );
  59.         # 告诉ReadLine监控终端
  60.         $console->get( 'input your message, bye to quit: ');
  61.         $heap->{console} = $console;
  62. }

  63. sub connect_fail {
  64.         delete $_[HEAP]->{server};
  65. }

  66. sub server_input {
  67.         my ($heap, $input) = @_[HEAP, ARG0];
  68.         # 如果使用print "$input\n"会搞乱终端
  69.         $heap->{console}->put( $input );
  70. }

  71. sub user_input {
  72.         my ($heap, $input) = @_[HEAP, ARG0];
  73.         if ($input =~ /(quit)|(exit)|(bye)/i) {
  74.                 delete $heap->{server};
  75.                 delete $heap->{console};
  76.                 return;
  77.         }
  78.         # 发送到服务端
  79.         $heap->{server}->put( $input );
  80.         # 继续监控终端
  81.         $heap->{console}->get( 'input your message, bye to quit: ');
  82. }
复制代码

Filter的功能很强,从简单的行协议通讯、加数据包长度前缀的通讯,到各种网际协议(HTTP、FTP、Jabber、MSN)都可以设计出相应的Filter来处理。POE/Filter下面的各种各样的Filter,可以拿来学习使用。简单一点的通讯协议可以使用Filter Block,会自动在数据包前面加上长度或者根据长度读取数据。这些Filter又都有丰富的选项,可以改变处理细节。比如Filter Line可以设置行分割符,Filter Block可以设置为固定包长度通讯,或者提供自己的数据包前缀处理函数。

论坛徽章:
0
6 [报告]
发表于 2007-03-15 13:34 |显示全部楼层
执行常规任务

除了执行IO操作的Session(携带一个或者多个Wheel),应用程序肯定也需要一些执行低IO延迟的工作,比如统计运行数据、记录日志等。你可以把POE::Kernel->run那一句替换为如下代码:

while (not $done) {
        POE::Kernel->run_one_timeslice;
        # 如果运行到一定时间,统计数据
        # 如果运行到一定时间,记录日志
}

也可以创建一个Session,使用POE Kernel的alarm、delay等函数设置定期执行任务。下面的代码实现了这样一个组件,它接收一个代码块引用,以及一个时间间隔参数,当到达时间间隔后,它就会执行那个代码块,如果执行结果返回真值,它则继续设置下一个定时,否则就结束自己的工作。事实上还可以重新设计,使代码块返回下一次被触发执行的时间间隔。也可以不设置Interval,这样常规任务就会在每一次有任何事件发生时被执行。

  1. use warnings;
  2. use strict;

  3. package Component::TimerRoutine;

  4. use Carp qw /croak/;
  5. use POE;

  6. # 启动一个Session,执行定时任务
  7. sub spawn {
  8.         my $type = shift;
  9.         my %params = @_;
  10.         my $interval = delete $params{Interval};
  11.         my $workhorse = delete $params{Workhorse};
  12.         croak "Workhorse param must be a subroutine reference"
  13.                 unless ref($workhorse) eq 'CODE';
  14.         POE::Session->create
  15.                 ( inline_states => {
  16.                         _start => \&timer_routine_start,
  17.                         dowork => \&dowork,
  18.                   },
  19.                   # 在创建一个Session时,可以通过heap参数设定Session私有数据
  20.                   heap => {
  21.                           interval => $interval,
  22.                           workhorse => $workhorse,
  23.                   }
  24.                        
  25.                 );
  26.         undef;
  27. }

  28. sub timer_routine_start {
  29.         my ($kernel, $heap) = @_[KERNEL, HEAP];
  30.         if ($heap->{interval}) {
  31.                 $kernel->delay_set( 'dowork', $heap->{interval} );
  32.         } else {
  33.                 $kernel->yield( 'dowork' );
  34.         }
  35. }

  36. sub dowork {
  37.         my ($kernel, $heap) = @_[KERNEL, HEAP];
  38.         return unless $heap->{workhorse}->();
  39.         if ($heap->{interval}) {
  40.                 $kernel->delay_set( 'dowork', $heap->{interval} );
  41.         } else {
  42.                 $kernel->yield( 'dowork' );
  43.         }
  44. }


  45. package main;

  46. use POE;

  47. sub work {
  48.         print time, "\n";
  49.         return 1;
  50. }

  51. Component::TimerRoutine->spawn
  52.         ( Interval => 1,
  53.           Workhorse => \&work,
  54.         );

  55. POE::Kernel->run;
复制代码


除了在使用Wheel的情况下Session的状态被自动设置外,应用程序也可以使用POE::Kernel的函数post、yield、alarm、delay来设置某一个Session的状态。还可以使用Kernel::call直接调用某一个状态的处理函数,而不用通过消息分发机制。在对Session操作时,一个通常的做法时使用Session的别名而不是使用POE::Session->create的返回值。别名可以在创建Session后通过POE::Kernel::alias_set设定。

论坛徽章:
0
7 [报告]
发表于 2007-03-15 16:17 |显示全部楼层
Component: 组合高级应用

如果来总结一个POE Server应用程序的编写,会发现程序基本上是先建立一个Session,然后创建一个监听Socket,并把这个Socket挂载到一个ListenAccept的Wheel上来监控连接事件,在有新连接到达时,新建一个Wheel ReadWrite跟客户端通讯。如果把这些常规动作再封装起来就产生了组件。


下面的聊天室服务器程序使用Component::Server::TCP重写,从编写轮子代码的重复中解脱了出来。这个组件会为每一个新连接的客户端建立一个单独的Session,这是跟上面的服务器程序不同的地方,不过这都是在组件内实现的。

  1. use warnings;
  2. use strict;

  3. use IO::Socket;
  4. use POE qw /Component::Server::TCP/;

  5. # 客户连接表
  6. my %conntab = ();

  7. POE::Component::Server::TCP->new
  8.         ( Port => 8000,
  9.           ClientConnected => sub {
  10.                   # 登记客户
  11.                   $conntab{ $_[HEAP]->{client}->ID } = $_[HEAP]->{client};
  12.           },
  13.           ClientDisconnected => sub {
  14.                   # 注销客户
  15.                   delete $conntab{ $_[HEAP]->{client}->ID };
  16.           },
  17.           ClientInput => \&client_input,
  18.         );

  19. POE::Kernel->run;

  20. sub client_input {
  21.         my ($heap, $input, $wid) = @_[HEAP, ARG0, ARG1];
  22.         # 广播数据。如果愿意,可以屏蔽掉$wid,即发送消息的客户端
  23.         map { $conntab{$_}->put( $input ) } keys %conntab;
  24. }

  25. # 是的,这些代码实现了一个聊天室服务器
复制代码


当你要开发一个新的程序时,可以先去查看是否有满足你要求的组件,如果没有再从编写Wheel开始。cpan上POE组件从Syslog、Dirwatch到TCP、HTTP、FTP、IRC、NTTP、SOAP、Jabber、MSN,从服务器到客户端,每一个都优雅地运行POE的平台上,给进一步的编程提供了坚实的基础。说不定将来也有qq的组件。

论坛徽章:
0
8 [报告]
发表于 2007-03-15 16:18 |显示全部楼层
暂告一段落吧,码字,测试,累的吐血。

论坛徽章:
0
9 [报告]
发表于 2007-03-16 17:17 |显示全部楼层
敢问FLW老大:无人喝彩,难道就不能成为原创精华吗?

论坛徽章:
0
10 [报告]
发表于 2007-05-15 09:12 |显示全部楼层
原帖由 wsliuhao 于 2007-5-15 09:05 发表
不错,不错,一定要学学POE。
cpan上搜到了 "POE-0.9989.tar.gz"
其他的 "POE::Kernel" "POE::Session" 等也需要单独下载么?
还是装完 "POE-0.9989.tar.gz"就都有了?


应该是装完就有了,试试吧。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP