本帖最后由 aef25u 于 2017-08-18 23:03 编辑
众所周知,Spark的ML Pipelines类库用于构建机器学习的工作流,每一个PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等。 Spark的ML Pipelines工作流大概是这个样子的。 - val pipeline = new
- Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))
- val model = pipeline.fit(trainingData)
复制代码 而perl的cpan上也有个Pipeline模块,作者是用perl的旧版oop实现的,在分析了源码和个人需求后,用Moose改写出了自已的Pipeline,主要实现了类ML Pipelines的DataFrame(改用perl的Data::Table模块)与Transformer功能。以下介绍如何使用Moose写Pipeline模块。
一、UML类图设计
二、模块间关系说明- Pipeline类的dispatcher属性是PipelineDispatch的实例对象
- PipelineDispatch类继承自ipelineBase,属性segments是利用Moose的属性委托功能实现的用于存放PipelineSegment实例对象的数组引用
- PipelineDispatch类内,属性dfhash是利用Moose的属性委托功能实现的用于存放各PipelineSegment->dispatch()返回值的hash引用:{ref($PipelineSegment)=>$df}即(YouSegmentClassName=>$df)
- PipelineSegment类的store属性是Pipelinestore的实例对象
- Pipelinestore类的功能,主要是在不同的PipelineSegment间存入数据或取出数据
dispatch()是核心方法,抽象方法如下: - dispatch()属Pipeline类方法
- dispatch_loop()属Pipeline类方法,是dispatch()的内置方法
- next()属PipelineDispatch类方法
内部调用关系如下: - Pipeline->dispatch(
- Pipeline->dispatch_loop(
- Pipeline::Dispatch->next(
- Pipeline::Segment->prepare_dispatch(Pipeline);
- my $df = Pipeline::Segment->dispatch();
- );
- );
- );
复制代码
注:自已写的继承自Pipeline::Segment的Segment类,即是Spark的ML Pipelines类的一个个Transformer 三、模块源码3.1Pipeline类- package Pipeline;
- use Moose;
- #use namespace::clean;
- use Pipeline::Dispatch;
- has 'debug' => (
- is => 'rw',
- isa => 'Int',
- default => 0,
- );
- has 'dispatcher' => (
- is => 'ro',
- isa => 'Pipeline::Dispatch',
- default => sub { Pipeline::Dispatch->new(); },
- handles => {
- get_segment => 'get',
- add_segment => 'add',
- del_segment => 'delete'
- }
- );
- has 'store' => (
- is => 'rw',
- isa => 'Pipeline::Store',
- default => sub { Pipeline::Store->new() },
- );
- sub segments {
- my $self = shift;
- return $self->{dispatcher}->segments(@_);
- }
- sub dispatch {
- my $self = shift;
- $self->dispatch_loop();
- $self->{dispatcher}->reset();
- }
- sub dispatch_loop {
- my $self = shift;
- $self->{dispatcher}->debug( $self->{debug} );
- while ( $self->{dispatcher}->segment_available ) {
- $self->{dispatcher}->next($self);
- }
- }
- sub getDf {
- my ($self,$segname) = @_;
-
- $self->{dispatcher}->getDf($segname);
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
复制代码
3.2Pipeline:ispatch类- package Pipeline::Dispatch;
- use Moose;
- use Pipeline::Store;
- use Pipeline::Segment;
- extends 'Pipeline::Base';
- use Data::Printer;
- has 'segments' => (
- traits => ['Array'],
- is => 'rw',
- isa => 'ArrayRef[Pipeline::Segment]',
- default => sub { [] },
- handles => {
- get => 'get',
- add => 'push',
- get_next_segment => 'shift',
- delete => 'delete',
- segment_available => 'count'
- }
- );
- has 'dispatched_segments' => (
- is => 'rw',
- isa => 'ArrayRef[Pipeline::Segment]',
- default => sub { [] }
- );
- has 'dfhash' => (
- traits => ['Hash'],
- is => 'ro',
- isa => 'HashRef',
- default => sub { {} },
- handles => {
- _set_opt => 'set',
- getDf => 'get',
- }
- );
- sub setDf {
- my ( $self, $obj, $df ) = @_;
- if ( defined($obj) ) {
- $self->_set_opt( ref($obj), $df );
- }
- return $self;
- }
- sub next {
- my $self = shift;
- my $pipe = shift;
- my $segment = $self->get_next_segment();
- $segment->prepare_dispatch($pipe);
- $self->emit( "dispatching to " . ref($segment) ) if $self->debug;
- my $df = $segment->dispatch();
- #将segment->dispatch的返回值克隆一份保存进dfhash
- if ( ref($df) eq 'Data::Table' ) {
- $self->setDf( $segment, $df->clone() );
- }
- else {
- $self->setDf( $segment, $df );
- $df=undef;
- }
- push @{ $self->{dispatched_segments} }, $segment;
- }
- sub reset {
- my $self = shift;
- $self->segments( $self->{dispatched_segments} );
- $self->dispatched_segments( [] );
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
复制代码
3.3Pipeline::Base类- package Pipeline::Base;
- use Moose;
- #use namespace::clean;
- has 'debug' => (
- is => 'rw',
- isa => 'Int',
- default => 0,
- );
- sub emit {
- my ( $self, $mesg ) = @_;
- $self->log( $self->_format_message($mesg) ) if $self->debug;
- }
- sub log {
- my ( $self, $mesg ) = @_;
- print STDERR $mesg;
- }
- sub _format_message {
- my ( $self, $mesg ) = @_;
- my $class = ref($self);
- return "[$class] $mesg\n";
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
复制代码
3.4Pipeline::Segment类- package Pipeline::Segment;
- use Moose;
- has 'store' => (
- is => 'rw',
- isa => 'Pipeline::Store',
- default => sub { Pipeline::Store->new() },
- );
- sub dispatch {
- my $self = shift;
- }
- sub prepare_dispatch {
- my ( $self, $pipe ) = @_;
- $self->store( $pipe->store );
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
复制代码
3.5Pipeline::Store类- package Pipeline::Store;
- use Moose;
- has 'storehash' => (
- traits => ['Hash'],
- is => 'ro',
- isa => 'HashRef[Object]',
- default => sub { {} },
- handles => {
- _set_opt => 'set',
- get => 'get',
- }
- );
- sub set {
- my $self = shift;
- my $obj = shift;
- if (defined( $obj )) {
- $self->_set_opt(ref($obj),$obj);
- }
- return $self;
-
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
复制代码
四、Example
4.1example.pl- package MyDf;
- use Moose;
- extends 'Pipeline::Segment';
- has 'df' => (
- is => 'rw',
- isa => 'Data::Table',
- );
- package MyData;
- use Moose;
- extends 'Pipeline::Segment';
- has 'df' => (
- is => 'rw',
- isa => 'Data::Table',
- );
- sub dispatch {
- my $self = shift;
- $self->store->set( MyDf->new( df => $self->{df} ) );
- return $self->{df};
- }
- package MySeg1;
- use Moose;
- extends 'Pipeline::Segment';
- sub dispatch {
- my $self = shift;
- my $df = $self->store->get('MyDf');
- #MySeg1将MyDf增加了一行合计数
- $df->{df}->addRow( ['合计',8,undef], 3 );
- return $df->{df};
- }
- package MySeg2;
- use Moose;
- extends 'Pipeline::Segment';
- sub dispatch {
- my $self = shift;
- my $df = $self->store->get('MyDf');
- #MySeg2将MyDf增加了一列总金额
- $df->{df}->addCol( [100,100,100,300],"total" ,3 );
- return $df->{df};
- }
- package main;
- use lib './lib';
- use Pipeline;
- use Data::Table;
- use Data::Printer;
- my $headers = [ 'name', 'count', 'price' ];
- my $rows = [ [ 'A', '1', '100' ],
- [ 'B', '2', '50' ],
- [ 'C', '5', '20' ]];
- my $df = Data::Table->new( $rows, $headers, 0 );
- #p $df->csv;
- my $pipeline = Pipeline->new();
- $pipeline->debug(1);
- my $mydata = MyData->new( df => $df );
- my $seg1 = MySeg1->new();
- my $seg2 = MySeg2->new();
- $pipeline->add_segment( $mydata, $seg1, $seg2 );
- my $production = $pipeline->dispatch();
- #p $pipeline->store->get('MyDf')->{df}->csv;
- p $pipeline->getDf("MyData")->csv;
- p $pipeline->getDf("MySeg1")->csv;
- p $pipeline->getDf("MySeg2")->csv;
- #Author blog:tianyv.github.io
复制代码
|