免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 39088 | 回复: 2
打印 上一主题 下一主题

关于perl6并发的问题 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2020-05-29 13:24 |只看该作者 |倒序浏览
本帖最后由 aef25u 于 2020-05-30 13:53 编辑

简介:
1、开启各15个线程的1个生产者与2个消费者A、B(AB同为耗时任务)
2、生产者按条件通过2个Channel向A与B分别发送数据(数据不需按原始顺序发送;按业务逻辑会发送不同数据,暂时按发送同一数据模拟)
3、数组@share为共享数据,消费者A、B会向其读取数据(不需写入或改变元素)
4、消费者A处理接收的数据,不符合业务逻辑的数据会继续传入B进行处理  if $v==999 {$supplierB.emit($v);}
问题:
1、这样组织代码有没有不合理的地方?
2、共享的@share数组安不安全?
3、假如在消费者A、B中使用以下cached 函数,在各自不同线程下能起作用不?

     use experimental :cached;
     sub fun-name( $val1, $val2) is cached {...}


  1. my $TIME = now;
  2. my $supplierA = Supplier.new;
  3. my $channelA = $supplierA.Supply.Channel;
  4. my $supplierB = Supplier.new;
  5. my $channelB = $supplierB.Supply.Channel;
  6. my @share=("C","D");
  7. my $threads=15;
  8. my (@pA,@pB);
  9. for 1 .. $threads {

  10.   @pA.push: start {
  11.       react  {
  12.             whenever $channelA -> $v {
  13.               if $v==999 {$supplierB.emit(1000);}#向消费者B发送数据,真实情况是发送$v
  14.               say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";

  15.           }
  16.      }
  17.   }
  18.   @pB.push: start {
  19.       react  {
  20.             whenever $channelB -> $v {
  21.               sleep 0.5;#真实代码不需这一行,模拟单个B任务比A任务耗时
  22.              say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
  23.           }
  24.      }
  25.   }
  26. }

  27. my @promises;
  28. for ^1000 -> $r {
  29.     push @promises, start {
  30.           #sleep $r*0.001;
  31.           sleep rand;
  32.           $supplierA.emit($r);
  33.           $supplierB.emit($r);
  34.     };
  35.     if @promises == 15 {
  36.          await Promise.anyof(@promises);
  37.          @promises .= grep({ !$_ });
  38.     }
  39. }

  40. await @promises;

  41. $supplierA.done;
  42. await @pA;
  43. $supplierB.done;
  44. await @pB;
  45. $TIME = now - $TIME;
  46. say $TIME;
复制代码

自已发现了个问题,要保证A处理完不符合业务逻辑的数据能传到B不能这样写:
  1. $supplierA.done;
  2. $supplierB.done;
  3. await @pA,@pB;
复制代码






论坛徽章:
0
2 [报告]
发表于 2020-05-30 13:50 |只看该作者
自已发现个问题,要保证A处理完不符业务逻辑的数据能传到B,不能这书写
  1. $supplierA.done;
  2. $supplierB.done;
  3. await @pA,@pB;
复制代码

而要这样写
  1. $supplierA.done;
  2. await @pA;
  3. $supplierB.done;
  4. await @pB;
复制代码
第一种写法可能出现A还没处理完$channelB就被关闭了。


论坛徽章:
0
3 [报告]
发表于 2020-06-02 13:33 |只看该作者
本帖最后由 aef25u 于 2020-06-02 16:49 编辑

通过自已上网搜索与亲自测试试验结果,初步可以回答之前提的问题,如果有不正确的地方,希望大神们指正。
1、这样组织代码有没有不合理的地方?
     这样组织可以满足我的任务,但之前认为@pA与@pB内不需要sleep是错误的。
     供应者需 sleep 0;消费者@pA与@pB分别为 sleep 0.2 sleep 0.1,这样有利于各个线程跑的任务相对均衡(因我使用的是Windows操作系统,CPU竞争策略属于抢占式)。当然 @pA设成与@pB一样也没问题,但 @pAsleep时间不能太大,否则可能出现B先结束,而@pA仍向关闭的通道B发送数据,但发送不了,任务永远不会结束。


2、共享的@share数组安不安全?
    共享的资源@share的读写均是安全的。

3、假如在消费者A、B中使用以下cached 函数,在各自不同线程下能起作用不?
     use experimental :cached;
     sub fun-name( $val1, $val2) is cached {...}
    这个对我的任务作用不大,而我的任务要求并发数大(在我的任务中我使用了100个并发线程),而分配到的各个线程的任务数小,再加上任务是随机分到各线程的,需使用cached的值的机会很小

最后,给出更完善的代码:

  1. my $TIME = now;
  2. my $supplierA = Supplier.new;
  3. my $channelA = $supplierA.Supply.Channel;
  4. my $supplierB = Supplier.new;
  5. my $channelB = $supplierB.Supply.Channel;
  6. my @share=("C","D");
  7. my $threads=15;
  8. my (@pA,@pB);
  9. for 1 .. $threads {

  10.   @pA.push: start {
  11.     my @aFir;
  12.     react  {
  13.         whenever $channelA -> $v {
  14.           sleep 0.2;
  15.           if $v % 2 {$supplierB.emit(1000);}
  16.           say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
  17.           #my $h={"Val"=>$v};
  18.           @aFir.push: $v*2;
  19.         }
  20.     }
  21.     @aFir;
  22.   }
  23.   @pB.push: start {

  24.     my @aSec;
  25.     react  {
  26.         whenever $channelB -> $v {
  27.           sleep 0.1;
  28.           say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
  29.           #my $h={"Val"=>$v};
  30.           @aSec.push:  $v*2;
  31.         }
  32.     }
  33.     @aSec;
  34.   }
  35. }
  36. sleep 1;
  37. my @promises;
  38. for ^1000 -> $r {
  39.   push @promises, start {
  40.         sleep 0;

  41.         $supplierA.emit($r);
  42.         $supplierB.emit($r);
  43.     };
  44.     if @promises == $threads {
  45.        await Promise.anyof(@promises);
  46.        @promises .= grep({ !$_ });
  47.     }
  48. }

  49. await @promises;

  50. $supplierA.done;
  51. my @retAdrgFIR=await @pA;
  52. $supplierB.done;
  53. my @retAdrgSEC=await @pB;

  54. #@retAdrgFIR .=grep({ $_});
  55. #@retAdrgSEC .=grep({ $_});
  56. dd @retAdrgFIR;
  57. dd @retAdrgSEC;

  58. #查看各线程最终分配执行的任务数
  59. my @nFIR=@retAdrgFIR.map( -> $n {$n.List.elems});
  60. my @nSEC=@retAdrgSEC.map( -> $n {$n.List.elems});
  61. dd @nFIR;
  62. dd @nSEC;

  63. $TIME = now - $TIME;
  64. say $TIME;
复制代码




您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP