免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2721 | 回复: 2

利用Moose写Pileline模块 [复制链接]

论坛徽章:
0
发表于 2017-08-18 22:56 |显示全部楼层
本帖最后由 aef25u 于 2017-08-18 23:03 编辑

众所周知,Spark的ML Pipelines类库用于构建机器学习的工作流,每一个PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等。
Spark的ML Pipelines工作流大概是这个样子的。
  1. val pipeline = new   
  2. Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))  
  3. val model = pipeline.fit(trainingData)
复制代码
而perl的cpan上也有个Pipeline模块,作者是用perl的旧版oop实现的,在分析了源码和个人需求后,用Moose改写出了自已的Pipeline,主要实现了类ML Pipelines的DataFrame(改用perl的Data::Table模块)与Transformer功能。以下介绍如何使用Moose写Pipeline模块。
一、UML类图设计
0uml.png

二、模块间关系说明
  • Pipeline类的dispatcher属性是PipelineDispatch的实例对象
  • PipelineDispatch类继承自ipelineBase,属性segments是利用Moose的属性委托功能实现的用于存放PipelineSegment实例对象的数组引用
  • PipelineDispatch类内,属性dfhash是利用Moose的属性委托功能实现的用于存放各PipelineSegment->dispatch()返回值的hash引用:{ref($PipelineSegment)=>$df}即(YouSegmentClassName=>$df)
  • PipelineSegment类的store属性是Pipelinestore的实例对象
  • Pipelinestore类的功能,主要是在不同的PipelineSegment间存入数据或取出数据
dispatch()是核心方法,抽象方法如下:
  • dispatch()属Pipeline类方法
  • dispatch_loop()属Pipeline类方法,是dispatch()的内置方法
    • next()属PipelineDispatch类方法
内部调用关系如下:
  1.     Pipeline->dispatch(
  2.          Pipeline->dispatch_loop(
  3.                Pipeline::Dispatch->next(
  4.                       Pipeline::Segment->prepare_dispatch(Pipeline);
  5.                       my $df = Pipeline::Segment->dispatch();
  6.               );
  7.           );
  8.     );
复制代码


注:自已写的继承自Pipeline::Segment的Segment类,即是Spark的ML Pipelines类的一个个Transformer
三、模块源码3.1Pipeline类
  1. package Pipeline;

  2. use Moose;

  3. #use namespace::clean;
  4. use Pipeline::Dispatch;

  5. has 'debug' => (
  6.     is      => 'rw',
  7.     isa     => 'Int',
  8.     default => 0,
  9. );
  10. has 'dispatcher' => (
  11.     is      => 'ro',
  12.     isa     => 'Pipeline::Dispatch',
  13.     default => sub { Pipeline::Dispatch->new(); },
  14.     handles => {
  15.         get_segment => 'get',
  16.         add_segment => 'add',
  17.         del_segment => 'delete'
  18.     }
  19. );

  20. has 'store' => (
  21.     is      => 'rw',
  22.     isa     => 'Pipeline::Store',
  23.     default => sub { Pipeline::Store->new() },
  24. );

  25. sub segments {
  26.     my $self = shift;
  27.     return $self->{dispatcher}->segments(@_);
  28. }

  29. sub dispatch {
  30.     my $self = shift;
  31.     $self->dispatch_loop();
  32.     $self->{dispatcher}->reset();
  33. }

  34. sub dispatch_loop {
  35.     my $self = shift;
  36.     $self->{dispatcher}->debug( $self->{debug} );
  37.     while ( $self->{dispatcher}->segment_available ) {
  38.         $self->{dispatcher}->next($self);
  39.     }
  40. }
  41. sub getDf {
  42.     my ($self,$segname) = @_;
  43.    
  44.     $self->{dispatcher}->getDf($segname);
  45. }

  46. #__PACKAGE__->meta->make_immutable;

  47. 1;
复制代码

3.2Pipeline:ispatch类
  1. package Pipeline::Dispatch;

  2. use Moose;
  3. use Pipeline::Store;
  4. use Pipeline::Segment;
  5. extends 'Pipeline::Base';

  6. use Data::Printer;

  7. has 'segments' => (
  8.     traits  => ['Array'],
  9.     is      => 'rw',
  10.     isa     => 'ArrayRef[Pipeline::Segment]',
  11.     default => sub { [] },
  12.     handles => {
  13.         get               => 'get',
  14.         add               => 'push',
  15.         get_next_segment  => 'shift',
  16.         delete            => 'delete',
  17.         segment_available => 'count'
  18.     }
  19. );

  20. has 'dispatched_segments' => (
  21.     is      => 'rw',
  22.     isa     => 'ArrayRef[Pipeline::Segment]',
  23.     default => sub { [] }
  24. );
  25. has 'dfhash' => (
  26.     traits  => ['Hash'],
  27.     is      => 'ro',
  28.     isa     => 'HashRef',
  29.     default => sub { {} },
  30.     handles => {
  31.         _set_opt => 'set',
  32.         getDf    => 'get',
  33.     }
  34. );

  35. sub setDf {
  36.     my ( $self, $obj, $df ) = @_;

  37.     if ( defined($obj) ) {
  38.         $self->_set_opt( ref($obj), $df );
  39.     }
  40.     return $self;

  41. }

  42. sub next {
  43.     my $self = shift;
  44.     my $pipe = shift;

  45.     my $segment = $self->get_next_segment();
  46.     $segment->prepare_dispatch($pipe);
  47.     $self->emit( "dispatching to " . ref($segment) ) if $self->debug;

  48.     my $df = $segment->dispatch();

  49.     #将segment->dispatch的返回值克隆一份保存进dfhash
  50.     if ( ref($df) eq 'Data::Table' ) {
  51.         $self->setDf( $segment, $df->clone() );
  52.     }
  53.     else {
  54.         $self->setDf( $segment, $df );
  55.         $df=undef;
  56.     }
  57.     push @{ $self->{dispatched_segments} }, $segment;
  58. }

  59. sub reset {
  60.     my $self = shift;
  61.     $self->segments( $self->{dispatched_segments} );
  62.     $self->dispatched_segments( [] );
  63. }

  64. #__PACKAGE__->meta->make_immutable;

  65. 1;
复制代码

3.3Pipeline::Base类
  1. package Pipeline::Base;

  2. use Moose;
  3. #use namespace::clean;

  4. has 'debug' => (
  5.     is      => 'rw',
  6.     isa     => 'Int',
  7.     default => 0,
  8. );

  9. sub emit {
  10.     my ( $self, $mesg ) = @_;
  11.     $self->log( $self->_format_message($mesg) ) if $self->debug;
  12. }

  13. sub log {
  14.     my ( $self, $mesg ) = @_;
  15.     print STDERR $mesg;
  16. }

  17. sub _format_message {
  18.     my ( $self, $mesg ) = @_;
  19.     my $class = ref($self);
  20.     return "[$class] $mesg\n";
  21. }

  22. #__PACKAGE__->meta->make_immutable;

  23. 1;
复制代码

3.4Pipeline::Segment类
  1. package Pipeline::Segment;

  2. use Moose;

  3. has 'store' => (
  4.     is      => 'rw',
  5.     isa     => 'Pipeline::Store',
  6.     default => sub { Pipeline::Store->new() },
  7. );

  8. sub dispatch {
  9.     my $self = shift;
  10. }

  11. sub prepare_dispatch {
  12.     my ( $self, $pipe ) = @_;
  13.     $self->store( $pipe->store );
  14. }

  15. #__PACKAGE__->meta->make_immutable;

  16. 1;
复制代码

3.5Pipeline::Store类
  1. package Pipeline::Store;

  2. use Moose;

  3. has 'storehash' => (
  4.     traits  => ['Hash'],
  5.     is      => 'ro',
  6.     isa     => 'HashRef[Object]',
  7.     default => sub { {} },
  8.     handles => {
  9.         _set_opt => 'set',
  10.         get      => 'get',
  11.     }
  12. );

  13. sub set {
  14.   my $self = shift;
  15.   my $obj  = shift;
  16.    if (defined( $obj )) {
  17.     $self->_set_opt(ref($obj),$obj);
  18.   }
  19.   return $self;
  20.   
  21. }

  22. #__PACKAGE__->meta->make_immutable;

  23. 1;
复制代码

四、Example
4.1example.pl
  1. package MyDf;
  2. use Moose;

  3. extends 'Pipeline::Segment';

  4. has 'df' => (
  5.     is  => 'rw',
  6.     isa => 'Data::Table',
  7. );

  8. package MyData;
  9. use Moose;

  10. extends 'Pipeline::Segment';

  11. has 'df' => (
  12.     is  => 'rw',
  13.     isa => 'Data::Table',
  14. );

  15. sub dispatch {
  16.     my $self = shift;
  17.     $self->store->set( MyDf->new( df => $self->{df} ) );
  18.     return $self->{df};
  19. }

  20. package MySeg1;
  21. use Moose;

  22. extends 'Pipeline::Segment';

  23. sub dispatch {
  24.     my $self = shift;
  25.     my $df   = $self->store->get('MyDf');
  26.     #MySeg1将MyDf增加了一行合计数
  27.     $df->{df}->addRow( ['合计',8,undef], 3 );
  28.     return $df->{df};
  29. }

  30. package MySeg2;
  31. use Moose;

  32. extends 'Pipeline::Segment';

  33. sub dispatch {
  34.     my $self = shift;

  35.     my $df = $self->store->get('MyDf');
  36.     #MySeg2将MyDf增加了一列总金额
  37.     $df->{df}->addCol( [100,100,100,300],"total" ,3 );
  38.     return $df->{df};
  39. }

  40. package main;
  41. use lib './lib';
  42. use Pipeline;
  43. use Data::Table;
  44. use Data::Printer;

  45. my $headers = [ 'name', 'count', 'price' ];

  46. my $rows = [ [ 'A', '1', '100' ],
  47.              [ 'B', '2', '50' ],
  48.              [ 'C', '5', '20' ]];
  49. my $df = Data::Table->new( $rows, $headers, 0 );

  50. #p $df->csv;

  51. my $pipeline = Pipeline->new();
  52. $pipeline->debug(1);

  53. my $mydata = MyData->new( df => $df );
  54. my $seg1 = MySeg1->new();
  55. my $seg2 = MySeg2->new();
  56. $pipeline->add_segment( $mydata, $seg1, $seg2 );
  57. my $production = $pipeline->dispatch();
  58. #p $pipeline->store->get('MyDf')->{df}->csv;
  59. p $pipeline->getDf("MyData")->csv;
  60. p $pipeline->getDf("MySeg1")->csv;
  61. p $pipeline->getDf("MySeg2")->csv;

  62. #Author blog:tianyv.github.io
复制代码



评分

参与人数 1信誉积分 +10 收起 理由
rubyish + 10 3Q~~

查看全部评分

求职 : 软件工程师
论坛徽章:
3
程序设计版块每日发帖之星
日期:2015-10-07 06:20:00程序设计版块每日发帖之星
日期:2015-12-13 06:20:00程序设计版块每日发帖之星
日期:2016-05-05 06:20:00
发表于 2017-08-19 02:19 |显示全部楼层
能用 Moose 写的代码,用 Ruby, Python 也能写吧。

不知道用 Java, C++ 能重写吗?

Java, C++, Golang 等静态类型语言,对数据结构有一些限制,例如数组中的元素都是同种类型,没有嵌套结构的数组。

论坛徽章:
0
发表于 2017-08-23 00:05 |显示全部楼层
Moose的Traits确实很好用啊。有时觉得Perl5是不是应该直接把Moose捆绑到core里面,从工程角度讲这样更有利于推广Perl5。然而现在CPAN上Moose/Moo/Mouse/Mo/Dios/Moxie。。。牛人们都在忙着玩轮子。。。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP