如何在多线程中保证结果输出顺序与数据输入顺序一样?
现假定仅一线程读取输入数据,三个(或任意个)Workers同时分别地处理数据,另一个线程输出已处理的数据,但要求其输出数据的顺序与输入数据顺序一致。为简单计,例如,若输入1,2,3,…N个整数,多线程处理后,输出的顺序仍为1,2,3,…N。下面简单程序尝试实现以上目标,但未成功,大多数情况下,输出是正确的。有时程序冻结某点,沒有全部输出所有数据。请教各位还需要什么同步机制才能保证任何情况下全部依序输岀?谢谢!
include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <semaphore.h>
#include <errno.h>
#define check(rc, msg) { \
if (!rc) \
fprintf(stderr, "%s failed \n", msg), exit(1); \
}
#define dbg(...)fprintf(stderr,__VA_ARGS__)
#define NBUFF 16 /* number of slots */
#define NP 1 /* number of producers */
#define NC 3 /* number of consumers */
int widx = 0, ridx = 0, qoidx = 0, qridx = 0;
struct rec {
struct {
int data;
size_t seq;
} buff;
sem_t full; /* number of full slots*/
sem_t empty; /* number of empty slots */
sem_t rmut; /* mutual exclusion to shared data */
sem_t wmut; /* mutual exclusion to shared data */
} shared, sha2;
size_t seq = 1;
void calc(size_t lineNbr, int val) {
/* do something with.., ie. val */
usleep(rand()%307);
sem_wait(&sha2.empty);
sem_wait(&sha2.wmut);
sha2.buff.data = val;
sha2.buff.seq = lineNbr;
dbg("QIn : %ld -> %d \n", sha2.buff.seq, sha2.buff.data);
qoidx = (qoidx+1) % NBUFF;
sem_post(&sha2.full);
sem_post(&sha2.wmut);
}
size_t cnt = 1; int fg=0;
void* seqOut(void* arg) {
for(int i=0;i<=seq && cnt<seq;i++) {
//dbg("..Loop 1.. \n");
sem_wait(&sha2.full);
fg=0;
while(!fg) {
if(sha2.buff.seq == cnt) {
fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff.data);
cnt++;
fg=1;
sem_post(&sha2.empty);
//dbg("..Loop 2.. \n");
}
else {
fg=0;
qridx = (qridx+1) % NBUFF;
//dbg("..Loop 3..qridx=%d \n", qridx);
}
}
qridx = (qridx+1) % NBUFF;
}
return NULL;
}
void *Producer(void *arg) {
int i;
for(i=1; i<=50; i++) {
/* If no empty slot, wait */
sem_wait(&shared.empty);
shared.buff.data = i;
shared.buff.seq = seq++;
//fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff.seq, shared.buff.data);
widx = (widx+1)%NBUFF;
sem_post(&shared.full);
}
for(int i=0; i<NC; i++) {
sem_wait(&shared.empty);
shared.buff.data = -999;
widx = (widx+1)%NBUFF;
sem_post(&shared.full);
}
return NULL;
}
void *Consumer(void *arg) {
int n, lineNbr;
while (1) {
sem_wait(&shared.full);
/* If another thread uses the buffer, wait */
sem_wait(&shared.rmut);
if(shared.buff.data == -999) {
ridx = (ridx+1)%NBUFF;
sem_post(&shared.empty);
sem_post(&shared.rmut);
return NULL;
}
n = shared.buff.data;
lineNbr = shared.buff.seq;
ridx = (ridx+1)%NBUFF;
sem_post(&shared.empty);
sem_post(&shared.rmut);
calc(lineNbr, n);
}
return NULL;
}
int main(int argc, char** argv) {
pthread_t idP, idC, seqThr;
int indexP, indexC, i;
check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");
for (i = 0; i < NP; i++) {
indexP = i+1;
check( pthread_create(&idP, NULL, Producer, &indexP) == 0, "create Pth err");
}
for (i = 0; i < NC; i++) {
indexC = i+1;
check( pthread_create(&idC, NULL, Consumer, &indexC) == 0, "create Cth err");
}
check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");
for (i = 0; i < NP; i++) {
check( pthread_join(idP, NULL) == 0, "join Pth err");
}
for (i = 0; i < NC; i++) {
check( pthread_join(idC, NULL) == 0, "join Cth err");
}
check( pthread_join(seqThr, NULL) == 0, "join seqThr err");
check( sem_destroy(&shared.full) == 0, "destroy full err");
check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
check( sem_destroy(&shared.empty) == 0, "destroy empty err");
check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
fprintf(stderr,"all OK \n");
return 0;
}
168,1 Bot
一种可能输出,正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 4 -> 4
QIn : 3 -> 3
Output : {3: 3}
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 6 -> 6
QIn : 7 -> 7
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 10 -> 10
QIn : 11 -> 11
QIn : 12 -> 12
QIn : 9 -> 9
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 13 -> 13
Output : {13: 13}
QIn : 16 -> 16
QIn : 14 -> 14
Output : {14: 14}
Output : {15: 15}
QIn : 15 -> 15
Output : {16: 16}
QIn : 17 -> 17
Output : {17: 17}
QIn : 20 -> 20
QIn : 18 -> 18
Output : {18: 18}
Output : {19: 19}
QIn : 19 -> 19
Output : {20: 20}
QIn : 21 -> 21
Output : {21: 21}
QIn : 23 -> 23
QIn : 24 -> 24
QIn : 22 -> 22
Output : {22: 22}
Output : {23: 23}
QIn : 27 -> 27
Output : {24: 24}
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 25 -> 25
Output : {25: 25}
Output : {26: 26}
Output : {27: 27}
Output : {28: 28}
QIn : 32 -> 32
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
Output : {29: 29}
Output : {30: 30}
Output : {31: 31}
Output : {32: 32}
QIn : 34 -> 34
QIn : 33 -> 33
Output : {33: 33}
Output : {34: 34}
QIn : 36 -> 36
QIn : 35 -> 35
Output : {35: 35}
Output : {36: 36}
QIn : 37 -> 37
Output : {37: 37}
QIn : 38 -> 38
Output : {38: 38}
QIn : 41 -> 41
QIn : 42 -> 42
QIn : 39 -> 39
Output : {39: 39}
QIn : 40 -> 40
Output : {40: 40}
Output : {41: 41}
Output : {42: 42}
QIn : 43 -> 43
Output : {43: 43}
QIn : 44 -> 44
Output : {44: 44}
QIn : 47 -> 47
QIn : 48 -> 48
QIn : 45 -> 45
Output : {45: 45}
QIn : 46 -> 46
Output : {46: 46}
Output : {47: 47}
Output : {48: 48}
QIn : 49 -> 49
Output : {49: 49}
QIn : 50 -> 50
Output : {50: 50}
all OK 另一种可能输出,不正确的.
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 3 -> 3
Output : {3: 3}
QIn : 4 -> 4
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 7 -> 7
QIn : 6 -> 6
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 11 -> 11
QIn : 9 -> 9
QIn : 10 -> 10
QIn : 12 -> 12
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 16 -> 16
QIn : 13 -> 13
Output : {13: 13}
Output : {14: 14}
QIn : 14 -> 14
QIn : 17 -> 17
QIn : 18 -> 18
QIn : 20 -> 20
QIn : 19 -> 19
QIn : 23 -> 23
QIn : 21 -> 21
QIn : 22 -> 22
QIn : 24 -> 24
QIn : 27 -> 27
QIn : 25 -> 25
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
程序冻结,沒有全部输出所有数据 本帖最后由 yulihua49 于 2016-08-18 15:14 编辑
没办法。并行数据处理的必定无序。数据的序就靠索引,用索引定序吧。
处理结果存入数据库,排序后输出。
或者,每个数据配个顺序号,处理完了按顺序号插入map,然后遍历map输出。
如果是分布处理,数据分布在多个服务器上,只有存入数据库一种方法比较简单。 不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上运行。C++码也欢迎! 本帖最后由 lxyscls 于 2016-08-19 11:41 编辑
mjus 发表于 2016-08-18 21:14 static/image/common/back.gif
不会吧?我看过 python / golang code, 他们 都做到了,而且与无序输出运行时间差很少!现只考虑在一台机上 ...
你的N是多少?50?50和50log50可不就不差太多
map的方法前面有人讲过了,再具体一点unordered_map
worker thread 处理完后insert,key是输入序号
output thread while(1)根据序号get,get到就说明已插入,没有就retry 你不要想太简单,这里只为简单示例,实际上N为千万級,或亿上大数据处理的!用map足够安全吗?关鍵怎样处理同步问题 将输出独立出来,
用数组做参数, 一个线程处理一个数组元素。 Make sense ! But How ?please show me something magic ! Trust you ! Thank you so much !!! 本帖最后由 lxyscls 于 2016-08-20 10:49 编辑
回复 7# mjus
https://msdn.microsoft.com/en-us/library/hh750089.aspx
https://software.intel.com/en-us/node/506077
多线程,map就没办法同步了?就算数据很多,但是只要消费掉,map还是不会太大。要求输出顺序,当然要给输入编号索引,否则怎么知道出去该谁先谁后
页:
[1]
2