mjus 发表于 2016-08-18 11:01

如何在多线程中保证结果输出顺序与数据输入顺序一样?

现假定仅一线程读取输入数据,三个(或任意个)Workers同时分别地处理数据,另一个线程输出已处理的数据,但要求其输出数据的顺序与输入数据顺序一致。为简单计,例如,若输入1,2,3,…N个整数,多线程处理后,输出的顺序仍为1,2,3,…N。

下面简单程序尝试实现以上目标,但未成功,大多数情况下,输出是正确的。有时程序冻结某点,沒有全部输出所有数据。请教各位还需要什么同步机制才能保证任何情况下全部依序输岀?谢谢!
include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <semaphore.h>
#include <errno.h>
#define check(rc, msg) { \
      if (!rc) \
          fprintf(stderr, "%s failed \n", msg), exit(1); \
}

#define dbg(...)fprintf(stderr,__VA_ARGS__)

#define NBUFF       16      /* number of slots */
#define NP          1       /* number of producers */
#define NC          3       /* number of consumers */

int widx = 0, ridx = 0, qoidx = 0, qridx = 0;

struct rec {
    struct {
    int    data;
    size_t seq;
    } buff;
    sem_t full;         /* number of full slots*/
    sem_t empty;          /* number of empty slots */
    sem_t rmut;         /* mutual exclusion to shared data */
    sem_t wmut;         /* mutual exclusion to shared data */
} shared, sha2;

size_t seq = 1;
void calc(size_t lineNbr, int val) {
      /* do something with.., ie. val */
      usleep(rand()%307);

      sem_wait(&sha2.empty);
      sem_wait(&sha2.wmut);
      sha2.buff.data = val;
      sha2.buff.seq = lineNbr;
      dbg("QIn : %ld -> %d \n", sha2.buff.seq, sha2.buff.data);
      qoidx = (qoidx+1) % NBUFF;
      sem_post(&sha2.full);
      sem_post(&sha2.wmut);
}

size_t cnt = 1; int fg=0;
void* seqOut(void* arg) {
      for(int i=0;i<=seq && cnt<seq;i++) {
         //dbg("..Loop 1.. \n");
         sem_wait(&sha2.full);
         fg=0;
         while(!fg) {
         if(sha2.buff.seq == cnt) {
            fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff.data);
            cnt++;
            fg=1;
            sem_post(&sha2.empty);
            //dbg("..Loop 2.. \n");
         }
         else {
            fg=0;
            qridx = (qridx+1) % NBUFF;
            //dbg("..Loop 3..qridx=%d \n", qridx);
          }
         }
         qridx = (qridx+1) % NBUFF;
    }
return NULL;
}

void *Producer(void *arg) {
   int i;

   for(i=1; i<=50; i++) {
            /* If no empty slot, wait */
            sem_wait(&shared.empty);

            shared.buff.data = i;
            shared.buff.seq = seq++;
            //fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff.seq, shared.buff.data);
            widx = (widx+1)%NBUFF;

            sem_post(&shared.full);
       }
   for(int i=0; i<NC; i++) {
            sem_wait(&shared.empty);
            shared.buff.data = -999;
            widx = (widx+1)%NBUFF;
            sem_post(&shared.full);
   }
return NULL;
}

void *Consumer(void *arg) {
    int n, lineNbr;

    while (1) {
      sem_wait(&shared.full);
      /* If another thread uses the buffer, wait */
      sem_wait(&shared.rmut);

      if(shared.buff.data == -999) {
          ridx = (ridx+1)%NBUFF;
          sem_post(&shared.empty);
          sem_post(&shared.rmut);
          return NULL;
      }

      n = shared.buff.data;
      lineNbr = shared.buff.seq;

      ridx = (ridx+1)%NBUFF;

      sem_post(&shared.empty);
      sem_post(&shared.rmut);
      calc(lineNbr, n);
    }

    return NULL;
}

int main(int argc, char** argv) {
    pthread_t idP,    idC,    seqThr;
    int       indexP, indexC, i;

    check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
    check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
    check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
    check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
    check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
    check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
    check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
    check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");
    for (i = 0; i < NP; i++) {
      indexP = i+1;
      check( pthread_create(&idP, NULL, Producer, &indexP) == 0, "create Pth err");
    }

    for (i = 0; i < NC; i++) {
      indexC = i+1;
      check( pthread_create(&idC, NULL, Consumer, &indexC) == 0, "create Cth err");
    }
    check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");

    for (i = 0; i < NP; i++) {
      check( pthread_join(idP, NULL) == 0, "join Pth err");
    }

    for (i = 0; i < NC; i++) {
      check( pthread_join(idC, NULL) == 0, "join Cth err");
    }

    check( pthread_join(seqThr, NULL) == 0, "join seqThr err");

    check( sem_destroy(&shared.full) == 0, "destroy full err");
    check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
    check( sem_destroy(&shared.empty) == 0, "destroy empty err");
    check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
    check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
    check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
    check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
    check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
    fprintf(stderr,"all OK \n");
    return 0;
}
                                                                                                                                    168,1         Bot

mjus 发表于 2016-08-18 11:03

一种可能输出,正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 4 -> 4
QIn : 3 -> 3
Output : {3: 3}
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 6 -> 6
QIn : 7 -> 7
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 10 -> 10
QIn : 11 -> 11
QIn : 12 -> 12
QIn : 9 -> 9
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 13 -> 13
Output : {13: 13}
QIn : 16 -> 16
QIn : 14 -> 14
Output : {14: 14}
Output : {15: 15}
QIn : 15 -> 15
Output : {16: 16}
QIn : 17 -> 17
Output : {17: 17}
QIn : 20 -> 20
QIn : 18 -> 18
Output : {18: 18}
Output : {19: 19}
QIn : 19 -> 19
Output : {20: 20}
QIn : 21 -> 21
Output : {21: 21}
QIn : 23 -> 23
QIn : 24 -> 24
QIn : 22 -> 22
Output : {22: 22}
Output : {23: 23}
QIn : 27 -> 27
Output : {24: 24}
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 25 -> 25
Output : {25: 25}
Output : {26: 26}
Output : {27: 27}
Output : {28: 28}
QIn : 32 -> 32
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
Output : {29: 29}
Output : {30: 30}
Output : {31: 31}
Output : {32: 32}
QIn : 34 -> 34
QIn : 33 -> 33
Output : {33: 33}
Output : {34: 34}
QIn : 36 -> 36
QIn : 35 -> 35
Output : {35: 35}
Output : {36: 36}
QIn : 37 -> 37
Output : {37: 37}
QIn : 38 -> 38
Output : {38: 38}
QIn : 41 -> 41
QIn : 42 -> 42
QIn : 39 -> 39
Output : {39: 39}
QIn : 40 -> 40
Output : {40: 40}
Output : {41: 41}
Output : {42: 42}
QIn : 43 -> 43
Output : {43: 43}
QIn : 44 -> 44
Output : {44: 44}
QIn : 47 -> 47
QIn : 48 -> 48
QIn : 45 -> 45
Output : {45: 45}
QIn : 46 -> 46
Output : {46: 46}
Output : {47: 47}
Output : {48: 48}
QIn : 49 -> 49
Output : {49: 49}
QIn : 50 -> 50
Output : {50: 50}
all OK

mjus 发表于 2016-08-18 11:06

另一种可能输出,不正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 3 -> 3
Output : {3: 3}
QIn : 4 -> 4
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 7 -> 7
QIn : 6 -> 6
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 11 -> 11
QIn : 9 -> 9
QIn : 10 -> 10
QIn : 12 -> 12
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 16 -> 16
QIn : 13 -> 13
Output : {13: 13}
Output : {14: 14}
QIn : 14 -> 14
QIn : 17 -> 17
QIn : 18 -> 18
QIn : 20 -> 20
QIn : 19 -> 19
QIn : 23 -> 23
QIn : 21 -> 21
QIn : 22 -> 22
QIn : 24 -> 24
QIn : 27 -> 27
QIn : 25 -> 25
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
程序冻结,沒有全部输出所有数据

yulihua49 发表于 2016-08-18 15:09

本帖最后由 yulihua49 于 2016-08-18 15:14 编辑

没办法。并行数据处理的必定无序。数据的序就靠索引,用索引定序吧。
处理结果存入数据库,排序后输出。

或者,每个数据配个顺序号,处理完了按顺序号插入map,然后遍历map输出。
如果是分布处理,数据分布在多个服务器上,只有存入数据库一种方法比较简单。

mjus 发表于 2016-08-18 21:14

不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上运行。C++码也欢迎!

lxyscls 发表于 2016-08-19 11:36

本帖最后由 lxyscls 于 2016-08-19 11:41 编辑

mjus 发表于 2016-08-18 21:14 static/image/common/back.gif
不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上 ...

你的N是多少?50?50和50log50可不就不差太多

map的方法前面有人讲过了,再具体一点unordered_map

worker thread 处理完后insert,key是输入序号

output thread while(1)根据序号get,get到就说明已插入,没有就retry

mjus 发表于 2016-08-19 21:34

你不要想太简单,这里只为简单示例,实际上N为千万級,或亿上大数据处理的!用map足够安全吗?关鍵怎样处理同步问题

folklore 发表于 2016-08-19 22:00

将输出独立出来,
用数组做参数, 一个线程处理一个数组元素。

mjus 发表于 2016-08-19 22:07

Make sense ! But How ?please show me something magic ! Trust you ! Thank you so much !!!

lxyscls 发表于 2016-08-20 10:45

本帖最后由 lxyscls 于 2016-08-20 10:49 编辑

回复 7# mjus


    https://msdn.microsoft.com/en-us/library/hh750089.aspx
    https://software.intel.com/en-us/node/506077

   多线程,map就没办法同步了?就算数据很多,但是只要消费掉,map还是不会太大。要求输出顺序,当然要给输入编号索引,否则怎么知道出去该谁先谁后
页: [1] 2
查看完整版本: 如何在多线程中保证结果输出顺序与数据输入顺序一样?