- 论坛徽章:
- 0
|
本帖最后由 aef25u 于 2020-05-30 13:53 编辑
简介:
1、开启各15个线程的1个生产者与2个消费者A、B(AB同为耗时任务)
2、生产者按条件通过2个Channel向A与B分别发送数据(数据不需按原始顺序发送;按业务逻辑会发送不同数据,暂时按发送同一数据模拟)
3、数组@share为共享数据,消费者A、B会向其读取数据(不需写入或改变元素)
4、消费者A处理接收的数据,不符合业务逻辑的数据会继续传入B进行处理 if $v==999 {$supplierB.emit($v);}
问题:
1、这样组织代码有没有不合理的地方?
2、共享的@share数组安不安全?
3、假如在消费者A、B中使用以下cached 函数,在各自不同线程下能起作用不?
use experimental :cached;
sub fun-name( $val1, $val2) is cached {...}
- my $TIME = now;
- my $supplierA = Supplier.new;
- my $channelA = $supplierA.Supply.Channel;
- my $supplierB = Supplier.new;
- my $channelB = $supplierB.Supply.Channel;
- my @share=("C","D");
- my $threads=15;
- my (@pA,@pB);
- for 1 .. $threads {
- @pA.push: start {
- react {
- whenever $channelA -> $v {
- if $v==999 {$supplierB.emit(1000);}#向消费者B发送数据,真实情况是发送$v
- say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
- }
- }
- }
- @pB.push: start {
- react {
- whenever $channelB -> $v {
- sleep 0.5;#真实代码不需这一行,模拟单个B任务比A任务耗时
- say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
- }
- }
- }
- }
- my @promises;
- for ^1000 -> $r {
- push @promises, start {
- #sleep $r*0.001;
- sleep rand;
- $supplierA.emit($r);
- $supplierB.emit($r);
- };
- if @promises == 15 {
- await Promise.anyof(@promises);
- @promises .= grep({ !$_ });
- }
- }
- await @promises;
- $supplierA.done;
- await @pA;
- $supplierB.done;
- await @pB;
- $TIME = now - $TIME;
- say $TIME;
复制代码
自已发现了个问题,要保证A处理完不符合业务逻辑的数据能传到B不能这样写:
- $supplierA.done;
- $supplierB.done;
- await @pA,@pB;
复制代码
|
|