lxyscls 发表于 2012-07-23 21:57

关于 多生产者/多消费者/缓冲区 几种模型的数个问题,请不吝赐教,谢谢!

本帖最后由 lxyscls 于 2012-07-30 10:09 编辑

基本模型为UNP V2上面的实现
1、1000000次写和读,从0写到1000000,同样从0读到1000000,如果读数有错误消费者报错
2、缓冲区数组大小10,循环写入,循环读出,buf = n,读出按同样的方式进行检查

模型A(UNPV2上面已实现,使用POSIX信号量)struct {    /* data shared by producers and consumers */
int   buff;
int   nput;         /* item number: 0, 1, 2, ... */
int   nputval;      /* value to store in buff[] */
int   nget;         /* item number: 0, 1, 2, ... */
int   ngetval;      /* value fetched from buff[] */
sem_t mutex, nempty, nstored;   /* semaphores, not pointers */
} shared;nput, nputval供生产者使用,nget,ngetval供消费者使用,nempty表示缓冲区中空位置,nstored表示缓冲区中已存个数,mutex锁操作      /* 4initialize three semaphores */
    Sem_init(&shared.mutex, 0, 1);
    Sem_init(&shared.nempty, 0, NBUFF);
    Sem_init(&shared.nstored, 0, 0);生产者线程函数:/* include produce */
void *
produce(void *arg)
{
    for ( ; ; ) {
      Sem_wait(&shared.nempty);   /* wait for at least 1 empty slot */
      Sem_wait(&shared.mutex);

      if (shared.nput >= nitems) {
            Sem_post(&shared.nstored);/* let consumers terminate */
            Sem_post(&shared.nempty);
            Sem_post(&shared.mutex);
            return(NULL);         /* all done */
      }

      shared.buff = shared.nputval;
      shared.nput++;
      shared.nputval++;

      Sem_post(&shared.mutex);
      Sem_post(&shared.nstored);/* 1 more stored item */
      *((int *) arg) += 1;
    }
}
/* end produce */
消费者线程函数:/* include consume */
void *
consume(void *arg)
{
    int   i;

    for ( ; ; ) {
      Sem_wait(&shared.nstored);/* wait for at least 1 stored item */
      Sem_wait(&shared.mutex);

      if (shared.nget >= nitems) {
            Sem_post(&shared.nstored);
            Sem_post(&shared.mutex);
            return(NULL);         /* all done */
      }

      i = shared.nget % NBUFF;
      if (shared.buff != shared.ngetval)
            printf("error: buff[%d] = %d\n", i, shared.buff);
      shared.nget++;
      shared.ngetval++;

      Sem_post(&shared.mutex);
      Sem_post(&shared.nempty);   /* 1 more empty slot */
      *((int *) arg) += 1;
    }
}
问题1(结合后续mutex实现发问):该模型在消费者/生产者线程数目对等情况下,基本上运行时间保持不变,即10消费者/10生产者 和 50消费者/50生产者 基本运行时间差别不大
问题2:该模型在生产者/消费者线程数目出现差异时,运行时间明显恶化,即10消费者/50生产者 或 50消费者/10生产者 比10/10或50/50运行慢,请问这是为什么嗫?因为线程过多调度的问题么?但是为什么10/50线程的调度比50/50还要慢?
原因A:消费者/生产者线程 被调用机会均等,如果出现巨大差异时,则相对来讲线程数少的XX者得到调度的机会减小,导致整体性能下降?
排除该原因,测试表明生产者/消费者 线程组 彼此相对独立,线程增多的组别恶化明显
问题3:如果把生产/消费的shared.mutex分开为两个,即生产/消费的读写操作使用独立的mutex,因为有nstored和nempty的保护,修改证明可行,但是运行时间无优化,这又是为什么呢?按道理生产者/消费者独立调度应该有提升才对——实际测量有10%——20%的提升,还是有优化的!

lxyscls 发表于 2012-07-23 21:58

本帖最后由 lxyscls 于 2012-07-30 10:08 编辑

模型B,使用简单mutex,生产者/消费者轮讯int   buff;
struct {
pthread_mutex_t   mutex;
int               nput;   /* next index to store */
int               nputval;    /* next value to store */
int               nget; /* next index to get */
int               ngetval; /* next value to get */
int               nready;
} shared = { PTHREAD_MUTEX_INITIALIZER };
生产者线程:void *
produce(void *arg)
{
    int i;

    for ( ; ; ) {
      Pthread_mutex_lock(&shared.mutex);
      if (shared.nput >= nitems) {
            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);       /* array is full, we're done */
      }
      if (shared.nready >= NBUFF) {
            Pthread_mutex_unlock(&shared.mutex);
            continue;
      }
      i = shared.nput % NBUFF;
      buff = shared.nputval;
      shared.nput++;
      shared.nputval++;
      shared.nready++;
      Pthread_mutex_unlock(&shared.mutex);

      *((int *)arg) += 1;
    }
}
消费者线程:void *
consume(void *arg)
{
    int i;

    for ( ; ; ) {
      Pthread_mutex_lock(&shared.mutex);
      if (shared.nget >= nitems) {
            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);
      }
      if (shared.nready == 0) {
            Pthread_mutex_unlock(&shared.mutex);
            continue;
      }
      i = shared.nget % NBUFF;
      if (buff != shared.ngetval)
            printf("buff[%d] = %d\n", shared.nget, buff);
      shared.nget++;
      shared.ngetval++;
      shared.nready--;
      Pthread_mutex_unlock(&shared.mutex);

      *((int *)arg) += 1;
    }
}
该模型缺点明显,采用简单轮讯的方式,运行时间比模型A差不少
本模型居然是效率最高的!!!单次操作运行时间最少
问题1:在对等线程数目增长的情况下,50/50比10/10明显恶化,这方面的性能明显不如模型A,这个是不是可以理解为线程过多调度带来的消耗?10/50 50/10同样带来明显恶化
原因A:轮询线程数增多带来的明显性能恶化?
原因B:10/50 50/10情况恶化可否视为线程数增多恶化和差异恶化的共同结果?

lxyscls 发表于 2012-07-23 21:59

本帖最后由 lxyscls 于 2012-07-30 10:06 编辑

模型C 仿照模型A采用mutex加cond实现struct {
pthread_mutex_t   mutex;
int               nput;   /* next index to store */
int               nputval;    /* next value to store */
int               nget; /* next index to get */
int               ngetval; /* next value to get */
} shared = { PTHREAD_MUTEX_INITIALIZER };

struct {
    pthread_mutex_t mutex;
    pthread_cond_tcond;
    int             nempty;
} empty = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NBUFF };

struct {
    pthread_mutex_t mutex;
    pthread_cond_tcond;
    int             nstored;
} stored = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0 };生产者线程:/* include prodcons */
void *
produce(void *arg)
{
    pthread_t tid = pthread_self();

    for ( ; ; ) {
      /* wait if array is full */
      Pthread_mutex_lock(&empty.mutex);
      while (empty.nempty == 0)
            Pthread_cond_wait(&empty.cond, &empty.mutex);
      empty.nempty--;
      Pthread_mutex_unlock(&empty.mutex);

      Pthread_mutex_lock(&shared.mutex);
      if (shared.nput >= nitems) {
            Pthread_mutex_lock(&stored.mutex);
            stored.nstored++;
            Pthread_cond_signal(&stored.cond);
            Pthread_mutex_unlock(&stored.mutex);

            Pthread_mutex_lock(&empty.mutex);
            empty.nempty++;
            Pthread_cond_signal(&empty.cond);
            Pthread_mutex_unlock(&empty.mutex);

            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);       /* array is full, we're done */
      }
      buff = shared.nputval;
      shared.nput++;
      shared.nputval++;
      Pthread_mutex_unlock(&shared.mutex);

      Pthread_mutex_lock(&stored.mutex);
      stored.nstored++;
      Pthread_cond_signal(&stored.cond);
      Pthread_mutex_unlock(&stored.mutex);

      *((int *)arg) += 1;
    }
}
消费者线程:void *
consume(void *arg)
{
    pthread_t tid = pthread_self();

    for ( ; ; ) {
      /* wait if array is empty */
      Pthread_mutex_lock(&stored.mutex);
      while (stored.nstored == 0)
            Pthread_cond_wait(&stored.cond, &stored.mutex);
      stored.nstored--;
      Pthread_mutex_unlock(&stored.mutex);

      Pthread_mutex_lock(&shared.mutex);
      if (shared.nget >= nitems) {
            Pthread_mutex_lock(&stored.mutex);
            stored.nstored++;
            Pthread_cond_signal(&stored.cond);
            Pthread_mutex_unlock(&stored.mutex);
            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);
      }
      if (buff != shared.ngetval)
            printf("buff[%d] = %d\n", shared.nget % NBUFF, buff);
      shared.nget++;
      shared.ngetval++;
      Pthread_mutex_unlock(&shared.mutex);

      Pthread_mutex_lock(&empty.mutex);
      empty.nempty++;
      Pthread_cond_signal(&empty.cond);
      Pthread_mutex_unlock(&empty.mutex);

      *((int *)arg) += 1;
    }
}
问题1:该结构基本上和模型A没有本质区别,为什么运行时间上比模型A明显恶化?
原因A:调用过多带来的负作用?
测试表明 mutx+cond 每完成一次即比 sem 的消耗要高
原因B:模型结构基本无区别,但是cond和sem存在一个差异在于sem的post操作即便无线程wait,同样会被记录,cond如果signal时无人wait则错过该次signal
问题2:在生产者/消费者数目增加的情况下,不能像模型A一样有一个相对稳定的运行时间,50/50比10/10明显恶化,请问还时调用带来的副作用么?为什么模型A在这种条件下有一个比较稳定的运行时间嗫?

关于模型C的改进:
1、发送信号时的优化,不再在mutex内部使用,而是只有判断条件为0时记录一个flag表示需要发送信号,再在unlock之后发送信号,如下:Pthread_mutex_lock(&empty.mutex);
dosignal = (empty.nempty == 0);
empty.nempty++;
Pthread_mutex_unlock(&empty.mutex);
if (dosignal)
    Pthread_cond_signal(&empty.cond);2、生产和消费的操作使用分开的mutex,因为原则上生产者和消费者间的互斥由mutex+cond来完成,而生产者内部和消费者内部的同步由各自独立的mutex来完成

该修改基本保证mutex+cond的使用方法在生产者/消费者线程数均等的情况下,效率基本等于原始的sem方案。
============总结===========
问题1:这三类模型在生产者/消费者数目差异激增的情况下面,运行时间都迅速恶化,请问有没有更合理的架构,能够满足这种情况?模型A只能做到生产者/消费者数目同步增长的情况下运行时间不恶化,B和C连这个做不到哦!
问题2:请问有没有更好的使用mutex+cond的方案,请不吝赐教——因为个人觉得原则上mutex应该比sem要快(这个是调试UNPV2上面部分程序的直观认识,可能不正确)
问题3:请问有什么好方法调试多线程哇,10/10已经基本上没办法调试了,更别说50/50了,gdb这个时候用用真的很吃力,1/1 1/2 2/2的情况还勉强可以

xiyoulaoyuanjia 发表于 2012-07-25 22:57

看着还行!抽空看下 UNP v2 :em03:

crazyhadoop 发表于 2012-07-26 20:30

上锁是需要代价的!

lxyscls 发表于 2012-07-27 10:01

crazyhadoop 发表于 2012-07-26 20:30 static/image/common/back.gif
上锁是需要代价的!
你好,我测了一下,关于每单个处理——基本上用mutex+cond的时间在900多个纳秒,sem在550个左右,确实耗销很大

请问关于多生产/多消费/有限缓冲区,是不是sem是一种更好的解决办法呢?

另外请问使用mutex+wait有没有代价更小的方法?

lxyscls 发表于 2015-07-23 20:09

原来自己怎么做得都忘了,居然最简单的速度最快,快了一个数量级{:yct50:}

lxyscls 发表于 2015-07-23 20:39

不过这种测试的方法有问题:生产者会持续的产生数据
而对于真正的使用场景中,这种假设不一定成立

那么模型B的开销就明显比A和C要大了,因为不停的lock/unlock

lxyscls 发表于 2015-08-13 14:51

Locks Aren't Slow; Lock Contention Is
http://preshing.com/20111118/locks-arent-slow-lock-contention-is/

参考文中所述:所有工作均在lock中完成,效率很低
页: [1]
查看完整版本: 关于 多生产者/多消费者/缓冲区 几种模型的数个问题,请不吝赐教,谢谢!