免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5866 | 回复: 8
打印 上一主题 下一主题

[Linux] 关于 多生产者/多消费者/缓冲区 几种模型的数个问题,请不吝赐教,谢谢! [复制链接]

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2012-07-23 21:57 |只看该作者 |倒序浏览
本帖最后由 lxyscls 于 2012-07-30 10:09 编辑

基本模型为UNP V2上面的实现
1、1000000次写和读,从0写到1000000,同样从0读到1000000,如果读数有错误消费者报错
2、缓冲区数组大小10,循环写入,循环读出,buf[n % 10] = n,读出按同样的方式进行检查

模型A(UNPV2上面已实现,使用POSIX信号量)
  1. struct {    /* data shared by producers and consumers */
  2.   int   buff[NBUFF];
  3.   int   nput;           /* item number: 0, 1, 2, ... */
  4.   int   nputval;        /* value to store in buff[] */
  5.   int   nget;           /* item number: 0, 1, 2, ... */
  6.   int   ngetval;        /* value fetched from buff[] */
  7.   sem_t mutex, nempty, nstored;     /* semaphores, not pointers */
  8. } shared;
复制代码
nput, nputval供生产者使用,nget,ngetval供消费者使用,nempty表示缓冲区中空位置,nstored表示缓冲区中已存个数,mutex锁操作
  1.         /* 4initialize three semaphores */
  2.     Sem_init(&shared.mutex, 0, 1);
  3.     Sem_init(&shared.nempty, 0, NBUFF);
  4.     Sem_init(&shared.nstored, 0, 0);
复制代码
生产者线程函数:
  1. /* include produce */
  2. void *
  3. produce(void *arg)
  4. {
  5.     for ( ; ; ) {
  6.         Sem_wait(&shared.nempty);   /* wait for at least 1 empty slot */
  7.         Sem_wait(&shared.mutex);

  8.         if (shared.nput >= nitems) {
  9.             Sem_post(&shared.nstored);  /* let consumers terminate */
  10.             Sem_post(&shared.nempty);
  11.             Sem_post(&shared.mutex);
  12.             return(NULL);           /* all done */
  13.         }

  14.         shared.buff[shared.nput % NBUFF] = shared.nputval;
  15.         shared.nput++;
  16.         shared.nputval++;

  17.         Sem_post(&shared.mutex);
  18.         Sem_post(&shared.nstored);  /* 1 more stored item */
  19.         *((int *) arg) += 1;
  20.     }
  21. }
  22. /* end produce */
复制代码
消费者线程函数:
  1. /* include consume */
  2. void *
  3. consume(void *arg)
  4. {
  5.     int     i;

  6.     for ( ; ; ) {
  7.         Sem_wait(&shared.nstored);  /* wait for at least 1 stored item */
  8.         Sem_wait(&shared.mutex);

  9.         if (shared.nget >= nitems) {
  10.             Sem_post(&shared.nstored);
  11.             Sem_post(&shared.mutex);
  12.             return(NULL);           /* all done */
  13.         }

  14.         i = shared.nget % NBUFF;
  15.         if (shared.buff[i] != shared.ngetval)
  16.             printf("error: buff[%d] = %d\n", i, shared.buff[i]);
  17.         shared.nget++;
  18.         shared.ngetval++;

  19.         Sem_post(&shared.mutex);
  20.         Sem_post(&shared.nempty);   /* 1 more empty slot */
  21.         *((int *) arg) += 1;
  22.     }
  23. }
复制代码
问题1(结合后续mutex实现发问):该模型在消费者/生产者线程数目对等情况下,基本上运行时间保持不变,即10消费者/10生产者 和 50消费者/50生产者 基本运行时间差别不大
问题2:该模型在生产者/消费者线程数目出现差异时,运行时间明显恶化,即10消费者/50生产者 或 50消费者/10生产者 比10/10或50/50运行慢,请问这是为什么嗫?因为线程过多调度的问题么?但是为什么10/50线程的调度比50/50还要慢?
原因A:消费者/生产者线程 被调用机会均等,如果出现巨大差异时,则相对来讲线程数少的XX者得到调度的机会减小,导致整体性能下降?
排除该原因,测试表明生产者/消费者 线程组 彼此相对独立,线程增多的组别恶化明显
问题3:如果把生产/消费的shared.mutex分开为两个,即生产/消费的读写操作使用独立的mutex,因为有nstored和nempty的保护,修改证明可行,但是运行时间无优化,这又是为什么呢?按道理生产者/消费者独立调度应该有提升才对——实际测量有10%——20%的提升,还是有优化的!

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
2 [报告]
发表于 2012-07-23 21:58 |只看该作者
本帖最后由 lxyscls 于 2012-07-30 10:08 编辑

模型B,使用简单mutex,生产者/消费者轮讯
  1. int     buff[NBUFF];
  2. struct {
  3.   pthread_mutex_t   mutex;
  4.   int               nput;   /* next index to store */
  5.   int               nputval;    /* next value to store */
  6.   int               nget; /* next index to get */
  7.   int               ngetval; /* next value to get */
  8.   int               nready;
  9. } shared = { PTHREAD_MUTEX_INITIALIZER };
复制代码
生产者线程:
  1. void *
  2. produce(void *arg)
  3. {
  4.     int i;

  5.     for ( ; ; ) {
  6.         Pthread_mutex_lock(&shared.mutex);
  7.         if (shared.nput >= nitems) {
  8.             Pthread_mutex_unlock(&shared.mutex);
  9.             return(NULL);       /* array is full, we're done */
  10.         }
  11.         if (shared.nready >= NBUFF) {
  12.             Pthread_mutex_unlock(&shared.mutex);
  13.             continue;
  14.         }
  15.         i = shared.nput % NBUFF;
  16.         buff[i] = shared.nputval;
  17.         shared.nput++;
  18.         shared.nputval++;
  19.         shared.nready++;
  20.         Pthread_mutex_unlock(&shared.mutex);

  21.         *((int *)arg) += 1;
  22.     }
  23. }
复制代码
消费者线程:
  1. void *
  2. consume(void *arg)
  3. {
  4.     int i;

  5.     for ( ; ; ) {
  6.         Pthread_mutex_lock(&shared.mutex);
  7.         if (shared.nget >= nitems) {
  8.             Pthread_mutex_unlock(&shared.mutex);
  9.             return(NULL);
  10.         }
  11.         if (shared.nready == 0) {
  12.             Pthread_mutex_unlock(&shared.mutex);
  13.             continue;
  14.         }
  15.         i = shared.nget % NBUFF;
  16.         if (buff[i] != shared.ngetval)
  17.             printf("buff[%d] = %d\n", shared.nget, buff[shared.nget]);
  18.         shared.nget++;
  19.         shared.ngetval++;
  20.         shared.nready--;
  21.         Pthread_mutex_unlock(&shared.mutex);

  22.         *((int *)arg) += 1;
  23.     }
  24. }
复制代码
该模型缺点明显,采用简单轮讯的方式,运行时间比模型A差不少
本模型居然是效率最高的!!!单次操作运行时间最少
问题1:在对等线程数目增长的情况下,50/50比10/10明显恶化,这方面的性能明显不如模型A,这个是不是可以理解为线程过多调度带来的消耗?10/50 50/10同样带来明显恶化
原因A:轮询线程数增多带来的明显性能恶化?
原因B:10/50 50/10情况恶化可否视为线程数增多恶化和差异恶化的共同结果?

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
3 [报告]
发表于 2012-07-23 21:59 |只看该作者
本帖最后由 lxyscls 于 2012-07-30 10:06 编辑

模型C 仿照模型A采用mutex加cond实现
  1. struct {
  2.   pthread_mutex_t   mutex;
  3.   int               nput;   /* next index to store */
  4.   int               nputval;    /* next value to store */
  5.   int               nget; /* next index to get */
  6.   int               ngetval; /* next value to get */
  7. } shared = { PTHREAD_MUTEX_INITIALIZER };

  8. struct {
  9.     pthread_mutex_t mutex;
  10.     pthread_cond_t  cond;
  11.     int             nempty;
  12. } empty = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NBUFF };

  13. struct {
  14.     pthread_mutex_t mutex;
  15.     pthread_cond_t  cond;
  16.     int             nstored;
  17. } stored = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0 };
复制代码
生产者线程:
  1. /* include prodcons */
  2. void *
  3. produce(void *arg)
  4. {
  5.     pthread_t tid = pthread_self();

  6.     for ( ; ; ) {
  7.         /* wait if array is full */
  8.         Pthread_mutex_lock(&empty.mutex);
  9.         while (empty.nempty == 0)
  10.             Pthread_cond_wait(&empty.cond, &empty.mutex);
  11.         empty.nempty--;
  12.         Pthread_mutex_unlock(&empty.mutex);

  13.         Pthread_mutex_lock(&shared.mutex);
  14.         if (shared.nput >= nitems) {
  15.             Pthread_mutex_lock(&stored.mutex);
  16.             stored.nstored++;
  17.             Pthread_cond_signal(&stored.cond);
  18.             Pthread_mutex_unlock(&stored.mutex);

  19.             Pthread_mutex_lock(&empty.mutex);
  20.             empty.nempty++;
  21.             Pthread_cond_signal(&empty.cond);
  22.             Pthread_mutex_unlock(&empty.mutex);

  23.             Pthread_mutex_unlock(&shared.mutex);
  24.             return(NULL);       /* array is full, we're done */
  25.         }
  26.         buff[shared.nput % NBUFF] = shared.nputval;
  27.         shared.nput++;
  28.         shared.nputval++;
  29.         Pthread_mutex_unlock(&shared.mutex);

  30.         Pthread_mutex_lock(&stored.mutex);
  31.         stored.nstored++;
  32.         Pthread_cond_signal(&stored.cond);
  33.         Pthread_mutex_unlock(&stored.mutex);

  34.         *((int *)arg) += 1;
  35.     }
  36. }
复制代码
消费者线程:
  1. void *
  2. consume(void *arg)
  3. {
  4.     pthread_t tid = pthread_self();

  5.     for ( ; ; ) {
  6.         /* wait if array is empty */
  7.         Pthread_mutex_lock(&stored.mutex);
  8.         while (stored.nstored == 0)
  9.             Pthread_cond_wait(&stored.cond, &stored.mutex);
  10.         stored.nstored--;
  11.         Pthread_mutex_unlock(&stored.mutex);

  12.         Pthread_mutex_lock(&shared.mutex);
  13.         if (shared.nget >= nitems) {
  14.             Pthread_mutex_lock(&stored.mutex);
  15.             stored.nstored++;
  16.             Pthread_cond_signal(&stored.cond);
  17.             Pthread_mutex_unlock(&stored.mutex);
  18.             Pthread_mutex_unlock(&shared.mutex);
  19.             return(NULL);
  20.         }
  21.         if (buff[shared.nget % NBUFF] != shared.ngetval)
  22.             printf("buff[%d] = %d\n", shared.nget % NBUFF, buff[shared.nget % NBUFF]);
  23.         shared.nget++;
  24.         shared.ngetval++;
  25.         Pthread_mutex_unlock(&shared.mutex);

  26.         Pthread_mutex_lock(&empty.mutex);
  27.         empty.nempty++;
  28.         Pthread_cond_signal(&empty.cond);
  29.         Pthread_mutex_unlock(&empty.mutex);

  30.         *((int *)arg) += 1;
  31.     }
  32. }
复制代码
问题1:该结构基本上和模型A没有本质区别,为什么运行时间上比模型A明显恶化?
原因A:调用过多带来的负作用?
测试表明 mutx+cond 每完成一次即比 sem 的消耗要高
原因B:模型结构基本无区别,但是cond和sem存在一个差异在于sem的post操作即便无线程wait,同样会被记录,cond如果signal时无人wait则错过该次signal
问题2:在生产者/消费者数目增加的情况下,不能像模型A一样有一个相对稳定的运行时间,50/50比10/10明显恶化,请问还时调用带来的副作用么?为什么模型A在这种条件下有一个比较稳定的运行时间嗫?

关于模型C的改进:
1、发送信号时的优化,不再在mutex内部使用,而是只有判断条件为0时记录一个flag表示需要发送信号,再在unlock之后发送信号,如下:
  1. Pthread_mutex_lock(&empty.mutex);
  2. dosignal = (empty.nempty == 0);
  3. empty.nempty++;
  4. Pthread_mutex_unlock(&empty.mutex);
  5. if (dosignal)
  6.     Pthread_cond_signal(&empty.cond);
复制代码
2、生产和消费的操作使用分开的mutex,因为原则上生产者和消费者间的互斥由mutex+cond来完成,而生产者内部和消费者内部的同步由各自独立的mutex来完成

该修改基本保证mutex+cond的使用方法在生产者/消费者线程数均等的情况下,效率基本等于原始的sem方案。
============总结===========
问题1:这三类模型在生产者/消费者数目差异激增的情况下面,运行时间都迅速恶化,请问有没有更合理的架构,能够满足这种情况?模型A只能做到生产者/消费者数目同步增长的情况下运行时间不恶化,B和C连这个做不到哦!
问题2:请问有没有更好的使用mutex+cond的方案,请不吝赐教——因为个人觉得原则上mutex应该比sem要快(这个是调试UNPV2上面部分程序的直观认识,可能不正确)
问题3:请问有什么好方法调试多线程哇,10/10已经基本上没办法调试了,更别说50/50了,gdb这个时候用用真的很吃力,1/1 1/2 2/2的情况还勉强可以

论坛徽章:
0
4 [报告]
发表于 2012-07-25 22:57 |只看该作者
看着还行!抽空看下 UNP v2

论坛徽章:
1
天蝎座
日期:2013-12-06 18:23:58
5 [报告]
发表于 2012-07-26 20:30 |只看该作者
上锁是需要代价的!

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
6 [报告]
发表于 2012-07-27 10:01 |只看该作者
crazyhadoop 发表于 2012-07-26 20:30
上锁是需要代价的!

你好,我测了一下,关于每单个处理——基本上用mutex+cond的时间在900多个纳秒,sem在550个左右,确实耗销很大

请问关于多生产/多消费/有限缓冲区,是不是sem是一种更好的解决办法呢?

另外请问使用mutex+wait有没有代价更小的方法?

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
7 [报告]
发表于 2015-07-23 20:09 |只看该作者
原来自己怎么做得都忘了,居然最简单的速度最快,快了一个数量级

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
8 [报告]
发表于 2015-07-23 20:39 |只看该作者
不过这种测试的方法有问题:生产者会持续的产生数据
而对于真正的使用场景中,这种假设不一定成立

那么模型B的开销就明显比A和C要大了,因为不停的lock/unlock

论坛徽章:
14
水瓶座
日期:2014-06-10 09:51:0215-16赛季CBA联赛之江苏
日期:2017-11-27 11:42:3515-16赛季CBA联赛之八一
日期:2017-04-12 14:26:2815-16赛季CBA联赛之吉林
日期:2016-08-20 10:43:1215-16赛季CBA联赛之广夏
日期:2016-06-23 09:53:58程序设计版块每日发帖之星
日期:2016-02-11 06:20:00程序设计版块每日发帖之星
日期:2016-02-09 06:20:0015-16赛季CBA联赛之上海
日期:2015-12-25 16:40:3515-16赛季CBA联赛之广夏
日期:2015-12-22 09:39:36程序设计版块每日发帖之星
日期:2015-08-24 06:20:002015亚冠之德黑兰石油
日期:2015-08-07 09:57:302015年辞旧岁徽章
日期:2015-03-03 16:54:15
9 [报告]
发表于 2015-08-13 14:51 |只看该作者
Locks Aren't Slow; Lock Contention Is
http://preshing.com/20111118/locks-arent-slow-lock-contention-is/

参考文中所述:所有工作均在lock中完成,效率很低
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP