- 论坛徽章:
- 0
|
现假定仅一线程读取输入数据,三个(或任意个)Workers同时分别地处理数据,另一个线程输出已处理的数据,但要求其输出数据的顺序与输入数据顺序一致。为简单计,例如,若输入1,2,3,…N个整数,多线程处理后,输出的顺序仍为1,2,3,…N。
下面简单程序尝试实现以上目标,但未成功,大多数情况下,输出是正确的。有时程序冻结某点,沒有全部输出所有数据。请教各位还需要什么同步机制才能保证任何情况下全部依序输岀?谢谢!
- include <pthread.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <semaphore.h>
- #include <errno.h>
- #define check(rc, msg) { \
- if (!rc) \
- fprintf(stderr, "%s failed \n", msg), exit(1); \
- }
- #define dbg(...) fprintf(stderr,__VA_ARGS__)
- #define NBUFF 16 /* number of slots */
- #define NP 1 /* number of producers */
- #define NC 3 /* number of consumers */
- int widx = 0, ridx = 0, qoidx = 0, qridx = 0;
- struct rec {
- struct {
- int data;
- size_t seq;
- } buff[NBUFF];
- sem_t full; /* number of full slots */
- sem_t empty; /* number of empty slots */
- sem_t rmut; /* mutual exclusion to shared data */
- sem_t wmut; /* mutual exclusion to shared data */
- } shared, sha2;
- size_t seq = 1;
- void calc(size_t lineNbr, int val) {
- /* do something with.., ie. val */
- usleep(rand()%307);
- sem_wait(&sha2.empty);
- sem_wait(&sha2.wmut);
- sha2.buff[qoidx].data = val;
- sha2.buff[qoidx].seq = lineNbr;
- dbg("QIn : %ld -> %d \n", sha2.buff[qoidx].seq, sha2.buff[qoidx].data);
- qoidx = (qoidx+1) % NBUFF;
- sem_post(&sha2.full);
- sem_post(&sha2.wmut);
- }
- size_t cnt = 1; int fg=0;
- void* seqOut(void* arg) {
- for(int i=0;i<=seq && cnt<seq;i++) {
- //dbg("..Loop 1.. \n");
- sem_wait(&sha2.full);
- fg=0;
- while(!fg) {
- if(sha2.buff[qridx].seq == cnt) {
- fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff[qridx].data);
- cnt++;
- fg=1;
- sem_post(&sha2.empty);
- //dbg("..Loop 2.. \n");
- }
- else {
- fg=0;
- qridx = (qridx+1) % NBUFF;
- //dbg("..Loop 3..qridx=%d \n", qridx);
- }
- }
- qridx = (qridx+1) % NBUFF;
- }
- return NULL;
- }
- void *Producer(void *arg) {
- int i;
- for(i=1; i<=50; i++) {
- /* If no empty slot, wait */
- sem_wait(&shared.empty);
- shared.buff[widx].data = i;
- shared.buff[widx].seq = seq++;
- //fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff[widx].seq, shared.buff[widx].data);
- widx = (widx+1)%NBUFF;
- sem_post(&shared.full);
- }
- for(int i=0; i<NC; i++) {
- sem_wait(&shared.empty);
- shared.buff[widx].data = -999;
- widx = (widx+1)%NBUFF;
- sem_post(&shared.full);
- }
- return NULL;
- }
- void *Consumer(void *arg) {
- int n, lineNbr;
- while (1) {
- sem_wait(&shared.full);
- /* If another thread uses the buffer, wait */
- sem_wait(&shared.rmut);
- if(shared.buff[ridx].data == -999) {
- ridx = (ridx+1)%NBUFF;
- sem_post(&shared.empty);
- sem_post(&shared.rmut);
- return NULL;
- }
- n = shared.buff[ridx].data;
- lineNbr = shared.buff[ridx].seq;
- ridx = (ridx+1)%NBUFF;
- sem_post(&shared.empty);
- sem_post(&shared.rmut);
- calc(lineNbr, n);
- }
- return NULL;
- }
- int main(int argc, char** argv) {
- pthread_t idP[NP], idC[NC], seqThr;
- int indexP[NP], indexC[NC], i;
- check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
- check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
- check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
- check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
- check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
- check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
- check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
- check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");
- for (i = 0; i < NP; i++) {
- indexP[i] = i+1;
- check( pthread_create(&idP[i], NULL, Producer, &indexP[i]) == 0, "create Pth err");
- }
- for (i = 0; i < NC; i++) {
- indexC[i] = i+1;
- check( pthread_create(&idC[i], NULL, Consumer, &indexC[i]) == 0, "create Cth err");
- }
- check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");
- for (i = 0; i < NP; i++) {
- check( pthread_join(idP[i], NULL) == 0, "join Pth err");
- }
- for (i = 0; i < NC; i++) {
- check( pthread_join(idC[i], NULL) == 0, "join Cth err");
- }
- check( pthread_join(seqThr, NULL) == 0, "join seqThr err");
- check( sem_destroy(&shared.full) == 0, "destroy full err");
- check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
- check( sem_destroy(&shared.empty) == 0, "destroy empty err");
- check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
- check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
- check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
- check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
- check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
- fprintf(stderr,"all OK \n");
- return 0;
- }
- 168,1 Bot
复制代码 |
|