免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12下一页
最近访问板块 发新帖
查看: 4279 | 回复: 12
打印 上一主题 下一主题

[C] 如何在多线程中保证结果输出顺序与数据输入顺序一样? [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2016-08-18 11:01 |只看该作者 |倒序浏览
现假定仅一线程读取输入数据,三个(或任意个)Workers同时分别地处理数据,另一个线程输出已处理的数据,但要求其输出数据的顺序与输入数据顺序一致。为简单计,例如,若输入1,2,3,…N个整数,多线程处理后,输出的顺序仍为1,2,3,…N。

下面简单程序尝试实现以上目标,但未成功,大多数情况下,输出是正确的。有时程序冻结某点,沒有全部输出所有数据。请教各位还需要什么同步机制才能保证任何情况下全部依序输岀?谢谢!

  1. include <pthread.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #include <semaphore.h>
  7. #include <errno.h>
  8. #define check(rc, msg) { \
  9.         if (!rc) \
  10.           fprintf(stderr, "%s failed \n", msg), exit(1); \
  11. }

  12. #define dbg(...)  fprintf(stderr,__VA_ARGS__)

  13. #define NBUFF       16      /* number of slots */
  14. #define NP          1       /* number of producers */
  15. #define NC          3       /* number of consumers */

  16. int widx = 0, ridx = 0, qoidx = 0, qridx = 0;

  17. struct rec {
  18.     struct {
  19.     int    data;
  20.     size_t seq;
  21.     } buff[NBUFF];
  22.     sem_t full;           /* number of full slots  */
  23.     sem_t empty;          /* number of empty slots */
  24.     sem_t rmut;           /* mutual exclusion to shared data */
  25.     sem_t wmut;           /* mutual exclusion to shared data */
  26. } shared, sha2;

  27. size_t seq = 1;
  28. void calc(size_t lineNbr, int val) {
  29.         /* do something with.., ie. val */
  30.         usleep(rand()%307);

  31.         sem_wait(&sha2.empty);
  32.         sem_wait(&sha2.wmut);
  33.         sha2.buff[qoidx].data = val;
  34.         sha2.buff[qoidx].seq = lineNbr;
  35.         dbg("QIn : %ld -> %d \n", sha2.buff[qoidx].seq, sha2.buff[qoidx].data);
  36.         qoidx = (qoidx+1) % NBUFF;
  37.         sem_post(&sha2.full);
  38.         sem_post(&sha2.wmut);
  39. }

  40. size_t cnt = 1; int fg=0;
  41. void* seqOut(void* arg) {
  42.       for(int i=0;i<=seq && cnt<seq;i++) {
  43.          //dbg("..Loop 1.. \n");
  44.          sem_wait(&sha2.full);
  45.          fg=0;
  46.          while(!fg) {
  47.          if(sha2.buff[qridx].seq == cnt) {
  48.             fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff[qridx].data);
  49.             cnt++;
  50.             fg=1;
  51.             sem_post(&sha2.empty);
  52.             //dbg("..Loop 2.. \n");
  53.          }
  54.          else {
  55.             fg=0;
  56.             qridx = (qridx+1) % NBUFF;
  57.             //dbg("..Loop 3..qridx=%d \n", qridx);
  58.           }
  59.          }
  60.          qridx = (qridx+1) % NBUFF;
  61.     }
  62. return NULL;
  63. }

  64. void *Producer(void *arg) {
  65.      int i;

  66.      for(i=1; i<=50; i++) {
  67.             /* If no empty slot, wait */
  68.             sem_wait(&shared.empty);

  69.             shared.buff[widx].data = i;
  70.             shared.buff[widx].seq = seq++;
  71.             //fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff[widx].seq, shared.buff[widx].data);
  72.             widx = (widx+1)%NBUFF;

  73.             sem_post(&shared.full);
  74.        }
  75.      for(int i=0; i<NC; i++) {
  76.             sem_wait(&shared.empty);
  77.               shared.buff[widx].data = -999;
  78.               widx = (widx+1)%NBUFF;
  79.             sem_post(&shared.full);
  80.      }
  81.   return NULL;
  82. }

  83. void *Consumer(void *arg) {
  84.     int n, lineNbr;

  85.     while (1) {
  86.         sem_wait(&shared.full);
  87.         /* If another thread uses the buffer, wait */
  88.         sem_wait(&shared.rmut);

  89.         if(shared.buff[ridx].data == -999) {
  90.           ridx = (ridx+1)%NBUFF;
  91.           sem_post(&shared.empty);
  92.           sem_post(&shared.rmut);
  93.           return NULL;
  94.         }

  95.         n = shared.buff[ridx].data;
  96.         lineNbr = shared.buff[ridx].seq;

  97.         ridx = (ridx+1)%NBUFF;

  98.         sem_post(&shared.empty);
  99.         sem_post(&shared.rmut);
  100.         calc(lineNbr, n);
  101.     }

  102.     return NULL;
  103. }

  104. int main(int argc, char** argv) {
  105.     pthread_t idP[NP],    idC[NC],    seqThr;
  106.     int       indexP[NP], indexC[NC], i;

  107.     check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
  108.     check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
  109.     check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
  110.     check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
  111.     check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
  112.     check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
  113.     check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
  114.     check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");
  115.     for (i = 0; i < NP; i++) {
  116.         indexP[i] = i+1;
  117.         check( pthread_create(&idP[i], NULL, Producer, &indexP[i]) == 0, "create Pth err");
  118.     }

  119.     for (i = 0; i < NC; i++) {
  120.         indexC[i] = i+1;
  121.         check( pthread_create(&idC[i], NULL, Consumer, &indexC[i]) == 0, "create Cth err");
  122.     }
  123.     check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");

  124.     for (i = 0; i < NP; i++) {
  125.         check( pthread_join(idP[i], NULL) == 0, "join Pth err");
  126.     }

  127.     for (i = 0; i < NC; i++) {
  128.         check( pthread_join(idC[i], NULL) == 0, "join Cth err");
  129.     }

  130.     check( pthread_join(seqThr, NULL) == 0, "join seqThr err");

  131.     check( sem_destroy(&shared.full) == 0, "destroy full err");
  132.     check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
  133.     check( sem_destroy(&shared.empty) == 0, "destroy empty err");
  134.     check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
  135.     check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
  136.     check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
  137.     check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
  138.     check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
  139.     fprintf(stderr,"all OK \n");
  140.     return 0;
  141. }
  142.                                                                                                                                     168,1         Bot
复制代码

论坛徽章:
0
2 [报告]
发表于 2016-08-18 11:03 |只看该作者
一种可能输出,正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 4 -> 4
QIn : 3 -> 3
Output : {3: 3}
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 6 -> 6
QIn : 7 -> 7
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 10 -> 10
QIn : 11 -> 11
QIn : 12 -> 12
QIn : 9 -> 9
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 13 -> 13
Output : {13: 13}
QIn : 16 -> 16
QIn : 14 -> 14
Output : {14: 14}
Output : {15: 15}
QIn : 15 -> 15
Output : {16: 16}
QIn : 17 -> 17
Output : {17: 17}
QIn : 20 -> 20
QIn : 18 -> 18
Output : {18: 18}
Output : {19: 19}
QIn : 19 -> 19
Output : {20: 20}
QIn : 21 -> 21
Output : {21: 21}
QIn : 23 -> 23
QIn : 24 -> 24
QIn : 22 -> 22
Output : {22: 22}
Output : {23: 23}
QIn : 27 -> 27
Output : {24: 24}
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 25 -> 25
Output : {25: 25}
Output : {26: 26}
Output : {27: 27}
Output : {28: 28}
QIn : 32 -> 32
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
Output : {29: 29}
Output : {30: 30}
Output : {31: 31}
Output : {32: 32}
QIn : 34 -> 34
QIn : 33 -> 33
Output : {33: 33}
Output : {34: 34}
QIn : 36 -> 36
QIn : 35 -> 35
Output : {35: 35}
Output : {36: 36}
QIn : 37 -> 37
Output : {37: 37}
QIn : 38 -> 38
Output : {38: 38}
QIn : 41 -> 41
QIn : 42 -> 42
QIn : 39 -> 39
Output : {39: 39}
QIn : 40 -> 40
Output : {40: 40}
Output : {41: 41}
Output : {42: 42}
QIn : 43 -> 43
Output : {43: 43}
QIn : 44 -> 44
Output : {44: 44}
QIn : 47 -> 47
QIn : 48 -> 48
QIn : 45 -> 45
Output : {45: 45}
QIn : 46 -> 46
Output : {46: 46}
Output : {47: 47}
Output : {48: 48}
QIn : 49 -> 49
Output : {49: 49}
QIn : 50 -> 50
Output : {50: 50}
all OK

论坛徽章:
0
3 [报告]
发表于 2016-08-18 11:06 |只看该作者
另一种可能输出,不正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 3 -> 3
Output : {3: 3}
QIn : 4 -> 4
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 7 -> 7
QIn : 6 -> 6
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 11 -> 11
QIn : 9 -> 9
QIn : 10 -> 10
QIn : 12 -> 12
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 16 -> 16
QIn : 13 -> 13
Output : {13: 13}
Output : {14: 14}
QIn : 14 -> 14
QIn : 17 -> 17
QIn : 18 -> 18
QIn : 20 -> 20
QIn : 19 -> 19
QIn : 23 -> 23
QIn : 21 -> 21
QIn : 22 -> 22
QIn : 24 -> 24
QIn : 27 -> 27
QIn : 25 -> 25
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
程序冻结,沒有全部输出所有数据

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
4 [报告]
发表于 2016-08-18 15:09 |只看该作者
本帖最后由 yulihua49 于 2016-08-18 15:14 编辑

没办法。并行数据处理的必定无序。数据的序就靠索引,用索引定序吧。
处理结果存入数据库,排序后输出。

或者,每个数据配个顺序号,处理完了按顺序号插入map,然后遍历map输出。
如果是分布处理,数据分布在多个服务器上,只有存入数据库一种方法比较简单。

论坛徽章:
0
5 [报告]
发表于 2016-08-18 21:14 |只看该作者
不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上运行。C++码也欢迎!

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
6 [报告]
发表于 2016-08-19 11:36 |只看该作者
本帖最后由 lxyscls 于 2016-08-19 11:41 编辑
mjus 发表于 2016-08-18 21:14
不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上 ...


你的N是多少?50?50和50log50可不就不差太多

map的方法前面有人讲过了,再具体一点unordered_map

worker thread 处理完后insert,key是输入序号

output thread while(1)根据序号get,get到就说明已插入,没有就retry

论坛徽章:
0
7 [报告]
发表于 2016-08-19 21:34 |只看该作者
你不要想太简单,这里只为简单示例,实际上N为千万級,或亿上大数据处理的!用map足够安全吗?关鍵怎样处理同步问题

论坛徽章:
59
2015年亚洲杯之约旦
日期:2015-01-27 21:27:392015年亚洲杯之日本
日期:2015-02-06 22:09:41拜羊年徽章
日期:2015-03-03 16:15:432015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:50:282015元宵节徽章
日期:2015-03-06 15:50:392015年亚洲杯之阿联酋
日期:2015-03-19 17:39:302015年亚洲杯之中国
日期:2015-03-23 18:52:23巳蛇
日期:2014-12-14 22:44:03双子座
日期:2014-12-10 21:39:16处女座
日期:2014-12-02 08:03:17天蝎座
日期:2014-07-21 19:08:47
8 [报告]
发表于 2016-08-19 22:00 |只看该作者
将输出独立出来,
用数组做参数, 一个线程处理一个数组元素。

论坛徽章:
0
9 [报告]
发表于 2016-08-19 22:07 |只看该作者
Make sense ! But How ?  please show me something magic ! Trust you ! Thank you so much !!!

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
10 [报告]
发表于 2016-08-20 10:45 |只看该作者
本帖最后由 lxyscls 于 2016-08-20 10:49 编辑

回复 7# mjus


    https://msdn.microsoft.com/en-us/library/hh750089.aspx
    https://software.intel.com/en-us/node/506077

   多线程,map就没办法同步了?就算数据很多,但是只要消费掉,map还是不会太大。要求输出顺序,当然要给输入编号索引,否则怎么知道出去该谁先谁后
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP