linux多线程编程(C):信号量实现的线程安全队列

时间:2022-02-19 15:14:54
用信号量实现的线程安全队列。 简单有用的示例程序, 比起互斥量的实现在多线程时效率更好。
cir_queue.h
  1. /*
  2.  * \File
  3.  * cir_queue.h
  4.  * \Brief
  5.  * circular queue
  6.  */
  7. #ifndef __CIR_QUEUE_H__
  8. #define __CIR_QUEUE_H__

  9. #define QUE_SIZE 8

  10. typedef int DataType;
  11. typedef struct cir_queue_t
  12. {
  13.   DataType data[QUE_SIZE];
  14.   int front;
  15.   int rear;
  16.   int count;
  17. }cir_queue_t;

  18. extern sem_t queue_sem; 

  19. void init_cir_queue(cir_queue_t* q);
  20. int is_empty_cir_queue(cir_queue_t* q);
  21. int is_full_cir_queue(cir_queue_t* q);
  22. void push_cir_queue(cir_queue_t* q, DataType x);
  23. DataType pop_cir_queue(cir_queue_t* q);
  24. DataType top_cir_queue(cir_queue_t* q);
  25. void destroy_cir_queue(cir_queue_t* q);
  26. void print_queue(cir_queue_t* q);

  27. #endif
main.c
  1. /*
  2.  * \File
  3.  * main.c
  4.  * \Breif
  5.  * Thread-safe circular-queue implemented by semaphore
  6.  * \Author
  7.  * Hank.yan
  8.  */
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <unistd.h>
  12. #include <string.h>
  13. #include <pthread.h>
  14. #include <semaphore.h>

  15. #include "cir_queue.h"

  16. void* thread_queue(void *arg);

  17.     
  18. /*
  19.  * \Func
  20.  * main
  21.  */
  22. int main(int argc, char* argv[])
  23. {
  24.   int res;
  25.   cir_queue_t cq;
  26.   DataType e;
  27.  
  28.   pthread_t a_thread, b_thread;
  29.   void* thread_result;

  30.   init_cir_queue(&cq);

  31.   push_cir_queue(&cq, 1);
  32.   push_cir_queue(&cq, 2);
  33.   push_cir_queue(&cq, 3);

  34.   print_queue(&cq);

  35.   res = pthread_create(&a_thread, NULL, thread_queue, (void*)&cq);
  36.   if (res != 0)
  37.   {
  38.     perror("Thread creation failed.");
  39.     exit(EXIT_FAILURE);
  40.   }

  41.   e = pop_cir_queue(&cq);    
  42.   e = pop_cir_queue(&cq);    
  43.   print_queue(&cq);

  44.   push_cir_queue(&cq, 9);
  45.   push_cir_queue(&cq, 100);

  46.   print_queue(&cq);

  47.   res = pthread_create(&b_thread, NULL, thread_queue, (void*)&cq);
  48.   if (res != 0)
  49.   {
  50.     perror("Thread creation failed.");
  51.     exit(EXIT_FAILURE);
  52.   }
  53.   e = pop_cir_queue(&cq);    


  54.   push_cir_queue(&cq, 20);
  55.   print_queue(&cq);

  56.   printf("Waiting for thread to finish...\n");
  57.   res = pthread_join(a_thread, &thread_result);
  58.   if (res != 0)
  59.   {
  60.     perror("Thread join failed.");
  61.     exit(EXIT_FAILURE);
  62.   }
  63.   print_queue(&cq);

  64.   printf("Waiting for thread to finish...\n");
  65.   res = pthread_join(b_thread, &thread_result);
  66.   if (res != 0)
  67.   {
  68.     perror("Thread join failed.");
  69.     exit(EXIT_FAILURE);
  70.   }

  71.   destroy_cir_queue(&cq);

  72.   printf("Thread joined, it returned %s\n", (char*)thread_result); 
  73.   exit(EXIT_SUCCESS);
  74. }


  75. void *thread_queue(void *cirqueue)
  76. {
  77.   int flag;
  78.   DataType element;

  79.   print_queue((cir_queue_t*)cirqueue);

  80.   flag = is_empty_cir_queue((cir_queue_t*)cirqueue);

  81.   print_queue((cir_queue_t*)cirqueue);
  82.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  83.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  84.   print_queue((cir_queue_t*)cirqueue);

  85.   push_cir_queue((cir_queue_t*)cirqueue, 5);
  86.   print_queue((cir_queue_t*)cirqueue);

  87.   push_cir_queue((cir_queue_t*)cirqueue, 99);
  88.   push_cir_queue((cir_queue_t*)cirqueue, 1000);
  89.   push_cir_queue((cir_queue_t*)cirqueue, 88);

  90.   print_queue((cir_queue_t*)cirqueue);
  91.   
  92.   pthread_exit("Thank you for the cpu time.");
  93. }

cir_queue.c
  1. /*
  2.  * \File
  3.  * cir_queue.c
  4.  */
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <string.h>
  9. #include <pthread.h>
  10. #include <semaphore.h>

  11. #include "cir_queue.h"

  12. sem_t queue_sem;
  13. /*
  14.  * \Func
  15.  *
  16.  */
  17. void init_cir_queue(cir_queue_t *q)
  18. {    
  19.   int res;

  20.   /* Create semaphore */
  21.   res = sem_init(&queue_sem, 0, QUE_SIZE);
  22.   if (res != 0)
  23.   {
  24.     perror("Semaphore init failed.\n");
  25.     exit(EXIT_FAILURE);
  26.   }
  27.   memset(q->data, 0, QUE_SIZE*sizeof(DataType));

  28.   q->front = q->rear = 0;
  29.   q->count = 0;
  30. }

  31. /*
  32.  * \Func
  33.  *
  34.  */
  35. int is_empty_cir_queue(cir_queue_t *q)
  36. {
  37.   int empty_flag;

  38.   sem_wait(&queue_sem);    

  39.   empty_flag = q->front == q->rear;

  40.   sem_post(&queue_sem);

  41.   return empty_flag;
  42. }

  43. /*
  44.  * \Func
  45.  *
  46.  */
  47. int is_full_cir_queue(cir_queue_t *q)
  48. {
  49.   int full_flag;

  50.   sem_wait(&queue_sem);    

  51.   full_flag = q->rear == QUE_SIZE - 1 + q->front;

  52.   sem_post(&queue_sem);

  53.   return full_flag;
  54. }

  55. /*
  56.  * \Func
  57.  *
  58.  */
  59. void push_cir_queue(cir_queue_t *q, DataType x)
  60. {

  61.   if (is_full_cir_queue(q))
  62.   {
  63.     printf("queue overflow.\n");
  64.     return ;
  65.   }

  66.   sem_wait(&queue_sem);    

  67.   q->count++;
  68.   q->data[q->rear] = x;
  69.   q->rear = (q->rear+1) % QUE_SIZE;

  70.   sem_post(&queue_sem);
  71.     
  72. }

  73. /*
  74.  * \Func
  75.  *
  76.  */
  77. DataType pop_cir_queue(cir_queue_t *q)
  78. {
  79.   DataType temp;

  80.   if (is_empty_cir_queue(q))
  81.   {
  82.     printf("queue empty.\n");
  83.     return 0;
  84.   }

  85.   sem_wait(&queue_sem);    

  86.   temp = q->data[q->front];
  87.   q->data[q->front] = 0;
  88.   q->count--;
  89.   q->front = (q->front+1) % QUE_SIZE;

  90.   sem_post(&queue_sem);

  91.   return temp;
  92. }

  93. /*
  94.  * \Func
  95.  *
  96.  */
  97. DataType top_cir_queue(cir_queue_t *q)
  98. {
  99.   DataType x; 

  100.   if (is_empty_cir_queue(q))
  101.   {
  102.     printf("queue is empty.\n");
  103.     return 0;
  104.   }

  105.   sem_wait(&queue_sem);    

  106.   x = q->data[q->front];

  107.   sem_post(&queue_sem);

  108.   return x;
  109. }

  110. void destroy_cir_queue(cir_queue_t *q)
  111. { 
  112.   sem_destroy(&queue_sem);

  113.   return;    
  114. }

  115. void print_queue(cir_queue_t* q)
  116. {
  117.   int index;
  118.   if (is_empty_cir_queue(q))
  119.   {
  120.     printf("queue is empty.\n");
  121.     return;
  122.   }

  123.   sem_wait(&queue_sem);    
  124.   printf("QUEUE: ");
  125.   for (index = 0; index < QUE_SIZE; index++)
  126.   {
  127.     printf(" %d ", q->data[index]);
  128.   }
  129.   printf("\n");

  130.   sem_post(&queue_sem);

  131.   return;
  132. }

makefile
  1. OBJECTS = main.o cir_queue.o
  2. CC = gcc
  3. CFLAGS = -D_REENTRANT -lpthread --Wall


  4. thrd_safe_queue: $(OBJECTS)
  5.   $(CC) $(CFLAGS) -o thrd_safe_queue $(OBJECTS)

  6. main.o: cir_queue.h
  7. cir_queue.o: cir_queue.h


  8. .PHONY:clean
  9. clean:
  10.   rm thrd_safe_queue $(OBJECTS)