Linux c++ 消费者 生产者 互斥同步 问题

时间:2021-09-29 20:21:27
在Linux平台下用c++编写程序中。建立一个缓冲区,是又循环单链表构成的。因为生产者可以对之前的数据进行覆盖。有两个消费者,每个线程都有自己固定的指针。也就是多个线程对缓冲区内的各个不同单元可以进行同步读写。但是在一个单元内不能有多个线程对它同时进行读写,想对缓冲区不加锁,在线程作用的单元加锁。但是对于互斥锁的理解还不强。不能够解决问题。还请各位给个解决方案。提供思路,还有可用的函数皆可。。。 
不过因为本人是新手,所以请各位大侠将详细点。。。

5 个解决方案

#1


1、既然用的是线程,可以直接用pthread_mutex
2、对每个单元建立一个pthread_mutex锁,在读写前加锁,读写动作完成后解锁。
3、相关函数原型如下:
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

#2


是在结构体里定义pthread_mutex么?那么要怎么初始化啊?麻烦楼上再详细点。谢谢~

#3


给你一篇详细介绍的文章:
http://www.ibm.com/developerworks/cn/linux/thread/posix_threadapi/part3/

#4


刚好前不久写过,只是缓冲区跟你的要求有点区别
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<semaphore.h>
#include<sys/types.h>
#include<errno.h>
#include<unistd.h>
#include<signal.h>
#include<time.h>

#define NUM_THREADS_P 5  /*define the number of producer*/
#define NUM_THREADS_C 5  /*define the number of consumer*/
#define MAX_BUFFER 20
#define RUN_TIME 20
int buffer[MAX_BUFFER]; /*difine the buffer */
int produce_pointer=0,consume_pointer=0;
sem_t producer_mutex,consumer_mutex,buffer_mutex;
pthread_t threads_p[NUM_THREADS_P]; /*producer*/
pthread_t threads_c[NUM_THREADS_C]; /*consumer*/
FILE* fd; 
void *producer_thread(void *tid);
void *consumer_thread(void *tid);
void showbuf();
void handler(){
int i;
for(i=0;i<NUM_THREADS_P;i++)
pthread_cancel(threads_p[i]);
for(i=0;i<NUM_THREADS_C;i++)
pthread_cancel(threads_c[i]);
}

int main(){
int i;
signal(SIGALRM,handler);
fd=fopen("output.txt","w");  /*open a file to save the result*/
sem_init(&producer_mutex,0,MAX_BUFFER); /*set the value of semaphores*/
sem_init(&consumer_mutex,0,0);
sem_init(&buffer_mutex,0,1);
for(i=0;i<MAX_BUFFER;i++)buffer[i]=0; /*initiate the buffer*/

/*create the threads*/
for(i=0;i<NUM_THREADS_P;i++)
pthread_create(&threads_p[i],NULL,(void*)producer_thread,(void*)(i+1));
for(i=0;i<NUM_THREADS_C;i++)
pthread_create(&threads_c[i],NULL,(void*)consumer_thread,(void *)(i+1));
alarm(RUN_TIME); /*set time to run*/
/*wait the threads to exit*/
for(i=0;i<NUM_THREADS_P;i++)
pthread_join(threads_p[i],NULL);
for(i=0;i<NUM_THREADS_C;i++)
pthread_join(threads_c[i],NULL);
/*destroy the semaphores*/
sem_destroy(&producer_mutex);  
sem_destroy(&consumer_mutex);
sem_destroy(&buffer_mutex);
fclose(fd);
return 0;
}

void *producer_thread(void *tid){
/*the thread can be canceled by other thread*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
while(1){
sem_wait(&producer_mutex);
srand((int)time(NULL)*(int)tid);
sleep(rand()%2+1); /*one or two seconds to produce*/
while((produce_pointer+1)%20==consume_pointer);

sem_wait(&buffer_mutex);
buffer[produce_pointer]=rand()%20+1;
produce_pointer=(produce_pointer+1)%20;
printf("producer:%d  pointer:%d  produced:%d\n",(int)tid,produce_pointer-1,buffer[produce_pointer-1]);
fprintf(fd,"producer:%d  pointer:%d  produced:%d\n",(int)tid,produce_pointer-1,buffer[produce_pointer-1]);
showbuf();
sem_post(&buffer_mutex);

sem_post(&consumer_mutex); /*inform the consumer the buffer is not empty*/
srand((int)time(NULL)*(int)tid);
sleep(rand()%5+1);   /*wait a few seconds ,then continue producing*/
}
return ((void*)0);
}

void *consumer_thread(void *tid){
/*the thread can be canceled by other thread*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
while(1){
sem_wait(&consumer_mutex);
srand((int)time(NULL)*(int)tid);
sleep(rand()%2+1); /*one or two seconds to consume*/

sem_wait(&buffer_mutex);
printf("consumer:%d  pointer:%d  consumed:%d\n",(int)tid,consume_pointer,buffer[consume_pointer]);
fprintf(fd,"consumer:%d  pointer:%d  consumed:%d\n",(int)tid,consume_pointer,buffer[consume_pointer]);
buffer[consume_pointer]=0;
consume_pointer=(consume_pointer+1)%20;
showbuf();
sem_post(&buffer_mutex);

sem_post(&producer_mutex); 
srand((int)time(NULL)*(int)tid);
sleep(rand()%5+1);   /*wait a few seconds ,then continue consuming*/
}
return ((void*)0);
}

/*show the content of buffer*/
void showbuf(){
int i;
printf("buffer:");
fprintf(fd,"buffer:");
for(i=0;i<MAX_BUFFER;i++){
printf("%d ",buffer[i]);
fprintf(fd,"%d ",buffer[i]);
}
printf("\n");
fprintf(fd,"\n");
}

#5


好像还有点不同额,不是用信号灯的。楼主就自己改下,一楼已经给了函数原型了

#1


1、既然用的是线程,可以直接用pthread_mutex
2、对每个单元建立一个pthread_mutex锁,在读写前加锁,读写动作完成后解锁。
3、相关函数原型如下:
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

#2


是在结构体里定义pthread_mutex么?那么要怎么初始化啊?麻烦楼上再详细点。谢谢~

#3


给你一篇详细介绍的文章:
http://www.ibm.com/developerworks/cn/linux/thread/posix_threadapi/part3/

#4


刚好前不久写过,只是缓冲区跟你的要求有点区别
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<semaphore.h>
#include<sys/types.h>
#include<errno.h>
#include<unistd.h>
#include<signal.h>
#include<time.h>

#define NUM_THREADS_P 5  /*define the number of producer*/
#define NUM_THREADS_C 5  /*define the number of consumer*/
#define MAX_BUFFER 20
#define RUN_TIME 20
int buffer[MAX_BUFFER]; /*difine the buffer */
int produce_pointer=0,consume_pointer=0;
sem_t producer_mutex,consumer_mutex,buffer_mutex;
pthread_t threads_p[NUM_THREADS_P]; /*producer*/
pthread_t threads_c[NUM_THREADS_C]; /*consumer*/
FILE* fd; 
void *producer_thread(void *tid);
void *consumer_thread(void *tid);
void showbuf();
void handler(){
int i;
for(i=0;i<NUM_THREADS_P;i++)
pthread_cancel(threads_p[i]);
for(i=0;i<NUM_THREADS_C;i++)
pthread_cancel(threads_c[i]);
}

int main(){
int i;
signal(SIGALRM,handler);
fd=fopen("output.txt","w");  /*open a file to save the result*/
sem_init(&producer_mutex,0,MAX_BUFFER); /*set the value of semaphores*/
sem_init(&consumer_mutex,0,0);
sem_init(&buffer_mutex,0,1);
for(i=0;i<MAX_BUFFER;i++)buffer[i]=0; /*initiate the buffer*/

/*create the threads*/
for(i=0;i<NUM_THREADS_P;i++)
pthread_create(&threads_p[i],NULL,(void*)producer_thread,(void*)(i+1));
for(i=0;i<NUM_THREADS_C;i++)
pthread_create(&threads_c[i],NULL,(void*)consumer_thread,(void *)(i+1));
alarm(RUN_TIME); /*set time to run*/
/*wait the threads to exit*/
for(i=0;i<NUM_THREADS_P;i++)
pthread_join(threads_p[i],NULL);
for(i=0;i<NUM_THREADS_C;i++)
pthread_join(threads_c[i],NULL);
/*destroy the semaphores*/
sem_destroy(&producer_mutex);  
sem_destroy(&consumer_mutex);
sem_destroy(&buffer_mutex);
fclose(fd);
return 0;
}

void *producer_thread(void *tid){
/*the thread can be canceled by other thread*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
while(1){
sem_wait(&producer_mutex);
srand((int)time(NULL)*(int)tid);
sleep(rand()%2+1); /*one or two seconds to produce*/
while((produce_pointer+1)%20==consume_pointer);

sem_wait(&buffer_mutex);
buffer[produce_pointer]=rand()%20+1;
produce_pointer=(produce_pointer+1)%20;
printf("producer:%d  pointer:%d  produced:%d\n",(int)tid,produce_pointer-1,buffer[produce_pointer-1]);
fprintf(fd,"producer:%d  pointer:%d  produced:%d\n",(int)tid,produce_pointer-1,buffer[produce_pointer-1]);
showbuf();
sem_post(&buffer_mutex);

sem_post(&consumer_mutex); /*inform the consumer the buffer is not empty*/
srand((int)time(NULL)*(int)tid);
sleep(rand()%5+1);   /*wait a few seconds ,then continue producing*/
}
return ((void*)0);
}

void *consumer_thread(void *tid){
/*the thread can be canceled by other thread*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
while(1){
sem_wait(&consumer_mutex);
srand((int)time(NULL)*(int)tid);
sleep(rand()%2+1); /*one or two seconds to consume*/

sem_wait(&buffer_mutex);
printf("consumer:%d  pointer:%d  consumed:%d\n",(int)tid,consume_pointer,buffer[consume_pointer]);
fprintf(fd,"consumer:%d  pointer:%d  consumed:%d\n",(int)tid,consume_pointer,buffer[consume_pointer]);
buffer[consume_pointer]=0;
consume_pointer=(consume_pointer+1)%20;
showbuf();
sem_post(&buffer_mutex);

sem_post(&producer_mutex); 
srand((int)time(NULL)*(int)tid);
sleep(rand()%5+1);   /*wait a few seconds ,then continue consuming*/
}
return ((void*)0);
}

/*show the content of buffer*/
void showbuf(){
int i;
printf("buffer:");
fprintf(fd,"buffer:");
for(i=0;i<MAX_BUFFER;i++){
printf("%d ",buffer[i]);
fprintf(fd,"%d ",buffer[i]);
}
printf("\n");
fprintf(fd,"\n");
}

#5


好像还有点不同额,不是用信号灯的。楼主就自己改下,一楼已经给了函数原型了