多个生产者一个消费者
#include "../unipc.h" #define NBUFF 10 #define MAX_PRODUCE 10 #define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH) int nitems; int nproducers; struct { int index; int buff[NBUFF]; sem_t mutex,nempty,nstored; } shared; void *produce(void *arg) { for(;;) { sem_wait(&shared.nempty); sem_wait(&shared.mutex); if(shared.index >= nitems){ sem_post(&shared.nempty); sem_post(&shared.mutex); return NULL; } shared.buff[shared.index %NBUFF] = shared.index; shared.index++; sem_post(&shared.mutex); sem_post(&shared.nstored); (*(int*)arg)++; } } void *consume(void *arg) { int i; for(i = 0;i < nitems; i++) { sem_wait(&shared.nstored); sem_wait(&shared.mutex); if(i != shared.buff[i%NBUFF]) { printf("i = %d,buff[%d]=%d\n",i,i,shared.buff[i]); } sem_post(&shared.mutex); sem_post(&shared.nempty); } return NULL; } int main(int argc ,char *argv[]) { int i; int count[MAX_PRODUCE]; pthread_t ptid_produce[MAX_PRODUCE],ptid_consume; if(argc != 3) { printf("usage produce3 <#produces> <#items>\n"); return -1; } nproducers = atoi(argv[1]); nitems = atoi(argv[2]); if(nproducers > MAX_PRODUCE) nproducers = MAX_PRODUCE; printf("nproducers = %d,nitems=%d\n",nproducers,nitems); sem_init(&shared.mutex,0,1); sem_init(&shared.nempty,0,NBUFF); sem_init(&shared.nstored,0,0); shared.index = 0; for(i = 0;i < nproducers; i++) { count[i] = 0; pthread_create(&ptid_produce[i],NULL,produce,&count[i]); } pthread_create(&ptid_consume,NULL,consume,NULL); for(i = 0;i < nproducers; i++) { pthread_join(ptid_produce[i],NULL); printf("count[%d] = %d\n",i,count[i]); } pthread_join(ptid_consume,NULL); sem_destroy(&shared.mutex); sem_destroy(&shared.nempty); sem_destroy(&shared.nstored); return 0; }生产者线程的终止
当最后一个条目生产出来后,每个生产者线程都执行循环顶端的语句:
sem_wait(&shared.nempty);
它将nempty信号量减1。然而每个生产者线程在终止前必须给该信号量加1,因为它在最后一次走过循环时并没有往缓存区存入一个条目。