免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3708 | 回复: 6
打印 上一主题 下一主题

多对多的生产者消费者程序,大家帮忙看看 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-11-13 20:44 |只看该作者 |倒序浏览
用条件变量和锁进行线程控制,问题如注释所写。就是消费者线程内的那个usleep语句,为什么它那么重要
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

//多个生产者多个消费者

#define BUFFSIZ 100000
#define MAXTHREADS 10

struct
{
&nbsp;&nbsp;&nbsp;&nbsp;int value;
&nbsp;&nbsp;&nbsp;&nbsp;pthread_t tid;
} buf[BUFFSIZ];

struct
{
&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_t mutex;
&nbsp;&nbsp;&nbsp;&nbsp;int index;//生产者使用,索引buf位置

} shared = { PTHREAD_MUTEX_INITIALIZER, 0};

struct
{
&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_t mutex;
&nbsp;&nbsp;&nbsp;&nbsp;pthread_cond_t cond;
&nbsp;&nbsp;&nbsp;&nbsp;int ready;//在该索引及之前的元素可以消费

&nbsp;&nbsp;&nbsp;&nbsp;int consume;//消费者将要消费的元素下标

} nready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, -1, 0};

void *produce(void *arg)
{
&nbsp;&nbsp;&nbsp;&nbsp;for (;;)
&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_lock(&shared.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (shared.index > BUFFSIZ - 1)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_unlock(&shared.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return NULL;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;buf[shared.index].value = shared.index;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;buf[shared.index].tid = pthread_self();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;++shared.index;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_unlock(&shared.mutex);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_lock(&nready.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;++nready.ready;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if ( nready.ready >= nready.consume)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_cond_broadcast(&nready.cond);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_unlock(&nready.mutex);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;++(*(int *)arg);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;usleep(1);
&nbsp;&nbsp;&nbsp;&nbsp;}
}

void *consume(void *arg)
{
&nbsp;&nbsp;&nbsp;&nbsp;int i;
&nbsp;&nbsp;&nbsp;&nbsp;for (;;)
&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_lock(&nready.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if ( nready.consume>= BUFFSIZ)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_unlock(&nready.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf("consume thread %lu is donen", pthread_self());&nbsp;&nbsp;&nbsp;&nbsp;

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return NULL;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;while ( nready.ready < nready.consume)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_cond_wait(&nready.cond, &nready.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if ( buf[nready.consume].value != nready.consume)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf("error: buf[%d]=%d, tid = %lun", nready.consume, buf[nready.consume].value, buf[nready.consume].tid);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;++nready.consume;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_mutex_unlock(&nready.mutex);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;usleep(1);// 如果没有这句,多个消费者就会死锁。


&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;++(*(int *)arg);
&nbsp;&nbsp;&nbsp;&nbsp;}
}


int main(int argc, char *argv[])
{
&nbsp;&nbsp;&nbsp;&nbsp;int&nbsp;&nbsp;&nbsp;&nbsp;i, nprod, ncons ;
&nbsp;&nbsp;&nbsp;&nbsp;pthread_t&nbsp;&nbsp;&nbsp;&nbsp;tid_produce[MAXTHREADS], tid_consume[MAXTHREADS];
&nbsp;&nbsp;&nbsp;&nbsp;int pcount[MAXTHREADS] = {0};
&nbsp;&nbsp;&nbsp;&nbsp;int ccount[MAXTHREADS] = {0};

&nbsp;&nbsp;&nbsp;&nbsp;if (argc != 3)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;perror("usage: a.out <#producer线程数> <#consumer线程数>"), exit(1);
&nbsp;&nbsp;&nbsp;&nbsp;nprod = atoi(argv[1])> MAXTHREADS ? MAXTHREADS : atoi(argv[1]);
&nbsp;&nbsp;&nbsp;&nbsp;ncons = atoi(argv[2])> MAXTHREADS ? MAXTHREADS : atoi(argv[2]);

&nbsp;&nbsp;&nbsp;&nbsp;pthread_setconcurrency(nprod+ncons);
&nbsp;&nbsp;&nbsp;&nbsp;for (i = 0; i < nprod; i++)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_create(&tid_produce[i], NULL, produce, &pcount[i]);
&nbsp;&nbsp;&nbsp;&nbsp;for (i = 0; i < ncons; i++)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_create(&tid_consume[i], NULL, consume, &ccount[i]);

&nbsp;&nbsp;&nbsp;&nbsp;for (i = 0; i < nprod; i++)
&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_join(tid_produce[i], NULL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf("produce thread %lu is donen", tid_produce[i]);&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;&nbsp;for (i = 0; i < ncons; i++)
&nbsp;&nbsp;&nbsp;&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_join(tid_consume[i], NULL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf("consume thread %lu is donen", tid_consume[i]);&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;}

&nbsp;&nbsp;&nbsp;&nbsp;for(i=0; i<nprod; ++i)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf(" producer %d n", pcount[i]);&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;for(i=0; i<ncons; ++i)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;printf(" consumer %d n", ccount[i]);&nbsp;&nbsp;&nbsp;&nbsp;

&nbsp;&nbsp;&nbsp;&nbsp;exit(0);
}

论坛徽章:
0
2 [报告]
发表于 2009-11-13 20:52 |只看该作者
chinaunix的代码格式比较好看,颜色很鲜明

论坛徽章:
1
天蝎座
日期:2013-10-23 21:11:03
3 [报告]
发表于 2009-11-14 20:37 |只看该作者
见过类似情况
那是Python里面的,一次起多个线程会报错
得起了一个sleep一小会再起一个线程才行

等待答案……

论坛徽章:
0
4 [报告]
发表于 2009-12-01 09:13 |只看该作者

论坛徽章:
8
CU大牛徽章
日期:2013-04-17 10:59:39CU大牛徽章
日期:2013-04-17 11:01:45CU大牛徽章
日期:2013-04-17 11:02:15CU大牛徽章
日期:2013-04-17 11:02:36CU大牛徽章
日期:2013-04-17 11:02:58技术图书徽章
日期:2013-12-04 10:48:50酉鸡
日期:2014-01-03 10:32:30辰龙
日期:2014-03-06 15:04:07
5 [报告]
发表于 2009-12-01 10:21 |只看该作者
不是死锁,是死循环。


假如线程1持有锁,并且在解锁后立刻重新上锁;那么在解锁和上锁中间,必须有线程切换,否则其他线程几乎永远拿不到锁。


因为线程1持有锁时,其他线程将因为等待条件不满足而被操作系统放入等待队列;那么线程1释放锁,其他线程只是从等待状态回到了就绪状态;除非线程1时间片到,否则操作系统不会重新分配CPU时间片,其他线程也就不会得到执行时间。

而在线程1中,从ulock到下一次lock,中间仅仅几十甚至几个机器指令那么一点点时间。如果在这么几个机器指令执行时间里,线程1没有因为时间片到而被挂起,那么它就会重新上锁,于是就导致其他诸线程被重新放入等待队列;于是到了时间片切换时,因为其他线程未就绪,线程1必将继续得到时间片……


加入usleep,相当于不持有锁情况下的线程1主动让出控制权(因为等待定时器信号而陷入休眠);于是就相当于强制操作系统进行下一轮优先级评估和时间片分配,其他线程就有了执行机会。

[ 本帖最后由 shan_ghost 于 2009-12-1 10:47 编辑 ]

论坛徽章:
0
6 [报告]
发表于 2009-12-01 14:25 |只看该作者

回复 #5 shan_ghost 的帖子

在经典的round_robin调度中,会存在这样的问题。说白了,就是其他线程被“饿死”。
但是,目前,大多数的操作系统都有防饿死的功能,一个线程随着其占用的CPU时间的增多,其优先级会下降。总会有那么一个时刻,其在unlock后,会被比他优先级更高的线程抢占。

论坛徽章:
8
CU大牛徽章
日期:2013-04-17 10:59:39CU大牛徽章
日期:2013-04-17 11:01:45CU大牛徽章
日期:2013-04-17 11:02:15CU大牛徽章
日期:2013-04-17 11:02:36CU大牛徽章
日期:2013-04-17 11:02:58技术图书徽章
日期:2013-12-04 10:48:50酉鸡
日期:2014-01-03 10:32:30辰龙
日期:2014-03-06 15:04:07
7 [报告]
发表于 2009-12-01 14:58 |只看该作者

回复 #6 drowndog 的帖子

问题是要等到操作系统防饿死机制启动、在线程1释放锁的时候立刻剥夺它的时间片,这个时间可就不是一般的长了。

我在某个项目里遇到过类似问题。一个网络程序,用打印字符串的方式来检查其他线程执行情况。
加个usleep,字符串打印非常频繁,好像刷屏一样,CPU占用也变得极低;去掉usleep,平均每半分钟左右才能打出一次字符串,CPU占用也彪升到90%以上。
当然,这也和他们的主线程设计有关,有事没事都瞎扫描。

注意: 防饿死是用来防止情况劣化到完全不可收拾地步的,不是用来帮你弥补程序设计缺陷的。

[ 本帖最后由 shan_ghost 于 2009-12-1 15:00 编辑 ]
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP