笔记整理--Linux多线程

时间:2023-03-08 22:10:56
笔记整理--Linux多线程

Unix高级环境编程系列笔记 (2013/11/17 14:26:38)

Unix高级环境编程系列笔记

通过这篇文字,您将能够解答如下问题:

#include int pthread_equal(pthread_t tid1, pthread_t tid2);Returns: nonzero if equal, 0 otherwise

使用pthread_self函数来获取线程的ID:

#include pthread_t pthread_self(void);Returns: the thread ID of the calling thread

2.如何创建一个新线程?

使用pthread_create函数创建一个新线程。

以下是代码片段:
#include  
int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void *(*start_rtn)(void), void *restrict arg);
Returns: 0 if OK, error number on failure

当该函数成功返回的时候,tidp所指向的内存位置将被分配给新创建的带有thread ID的线程。

attr用来定制各种线程参数。

新创建的线程将在start_rtn函数所指向的地址开始运行,该函数接受一个参数无类型的指针arg作为参数

线程创建时无法保证哪个线程会先运行。新创建的线程可以访问进程地址空间,并且继承了调用线程的浮点环境以及信号量掩码,但对于线程的未决信号量也将会被清除。

下面的这段程序创建新的线程,并打印线程id,新线程通过pthread_self函数获取自己的线程ID。

#include "apue.h"
#include pthread_t ntid;void printids(const char *s){    pid_t      pid;    pthread_t  tid;    pid = getpid();    tid = pthread_self();    printf("%s pid %u tid %u (0x%x)\\n", s, (unsigned int)pid,      (unsigned int)tid, (unsigned int)tid);}void * thr_fn(void *arg){    printids("new thread: ");    return((void *)0);}int main(void){    int     err;    err = pthread_create(&ntid, NULL, thr_fn, NULL);    if (err != 0)        err_quit("can\'t create thread: %s\\n", strerror(err));    printids("main thread:");    sleep(1);    exit(0);}

3. 如何实现单个线程的退出?

如果一个线程调用了exit, _Exit, 或者_exit,将导致整个进程的终止。要实现单个线程的退出,可以采用如下方式:

o 线程可以简单的从start routine返回,返回值就是线程的退出代码。

o 线程可以被同一进程中的其它线程终止。

o 线程调用pthread_exit

#include void pthread_exit(void *rval_ptr);

4.如何使调用线程阻塞等待指定线程的退出,并获得退出线程的返回码?

#include int pthread_join(pthread_t thread, void **rval_ptr);Returns: 0 if OK, error number on failure

调用线程将会被阻塞直到指定的线程终止。如果线程简单的从start routine返回则rval_ptr将包含返回代码。如果线程是被撤销(调用pthread_exit)的,rval_ptr指向的内存地址将被设置为PTHREAD_CANCELED.

通过调用pthread_join,我们自动的将一个线程变成分离状态,这样就可以实现资源的回收。如果线程已经处于分离状态,调用pthread_join将会失败,并返回EINVAL。

如果我们对于线程的返回值不感兴趣,可以将rval_ptr设置成NULL。

一段有缺陷的代码:

#include "apue.h"
#include struct foo {    int a, b, c, d;};voidprintfoo(const char *s, const struct foo *fp){    printf(s);    printf("  structure at 0x%x\\n", (unsigned)fp);    printf("  foo.a = %d\\n", fp->a);    printf("  foo.b = %d\\n", fp->b);    printf("  foo.c = %d\\n", fp->c);    printf("  foo.d = %d\\n", fp->d);}void *thr_fn1(void *arg){    struct foo  foo = {1, 2, 3, 4};    printfoo("thread 1:\\n", &foo);    pthread_exit((void *)&foo);}void *thr_fn2(void *arg){    printf("thread 2: ID is %d\\n", pthread_self());    pthread_exit((void *)0);}intmain(void){    int         err;    pthread_t   tid1, tid2;    struct foo  *fp;    err = pthread_create(&tid1, NULL, thr_fn1, NULL);    if (err != 0)        err_quit("can\'t create thread 1: %s\\n", strerror(err));    err = pthread_join(tid1, (void *)&fp);    if (err != 0)        err_quit("can\'t join with thread 1: %s\\n", strerror(err));    sleep(1);    printf("parent starting second thread\\n");    err = pthread_create(&tid2, NULL, thr_fn2, NULL);    if (err != 0)        err_quit("can\'t create thread 2: %s\\n", strerror(err));    sleep(1);    printfoo("parent:\\n", fp);    exit(0);}

注意,pthread_create 和 pthread_exit函数的无类型指针可以传递复杂的结构信息,但这个结构所使用的内存在调用者完成后必须仍然有效(分配在堆上或者是静态变量),否则就会出现使用无效的错误。这段代码中thr_fn1函数中变量foo分配在栈上,但该线程退出后,主线程通过pthread_join获取foo的地址并进行操作(调用printfoo函数时)就会出现错误,因为此时thr_fn1已经退出它的栈已经被销毁。

5.如何通过一个线程让另外一个线程退出?

调用pthread_cancel函数将导致tid所指向的线程终止运行。但是,一个线程可以选择忽略其它线程控制该线程何时退出。注意,该函数并不等待线程终止,它仅仅提出要求。

#include int pthread_cancel(pthread_t tid);Returns: 0 if OK, error number on failure

6.如何实现线程退出时的清理动作?

线程可以建立多个清理处理程序,这些程序记录在栈中,也就是说他们的执行顺序与注册顺序想法。使用如下函数注册清理函数:

void pthread_cleanup_push(void (*rtn)(void *), void *arg);

void pthread_cleanup_pop(int execute);

rtn将被调用,并传以arg参数,引起该函数调用的情况如下:

o 调用pthread_exit

o 对于退出请求的反应

o 以非0参数调用pthread_cleanup_push

如果pthread_cleanup_pop的参数非0则仅仅移除该处理函数而不执行。

如果函数已经处于分离状态,则当它退出时线程底层的存储资源会被立即回收。处于分离状态的线程,如果调用pthread_join来等待其退出将会出现错误。

通过下列函数可以让进程处于分离状态:

#include int pthread_detach(pthread_t tid);Returns: 0 if OK, error number on failure

7.Unix系统如何实现线程之间的同步?

使用pthreads mutual-exclusion interfaces。引入了mutex,用pthread_mutex_t类型来表示。在使用这个变量之前,我们首先要将其初始化,或者赋值为PTHREAD_MUTEX_INITIALIZER(仅仅用于静态分配的mutexs),或者调用pthread_mutex_init。如果我们动态的为mutex分配空间(例如通过调用malloc),我们需要在调用free释放内存之前调用pthread_mutex_destroy。

函数定义如下:

以下是代码片段:
#include  
int pthread_mutex_init(pthread_mutex_t *restrict mutex,  const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
Both return: 0 if OK, error number on failure

初始化mutex时参数attr用来指定mutex的属性,要使用默认值将它设置为NULL。

使用如下函数对mutex进行加锁或解锁:

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
All return: 0 if OK, error number on failure

注意当mutex已经被加锁则 pthread_mutex_lock会阻塞。如果一个线程无法忍受阻塞,可以调用pthread_mutex_trylock来加锁,加锁失败则立即返回EBUSY。

8.什么情况会发生线程死锁,如何避免死锁?

如果一个线程对mutex加两次锁则显然会导致死锁。但实际上死锁的情况要复杂的多:when we use more than one mutex in our programs, a deadlock can occur if we allow one thread to hold a mutex and block while trying to lock a second mutex at the same time that another thread holding the second mutex tries to lock the first mutex. Neither thread can proceed, because each needs a resource that is held by the other, so we have a deadlock.

死锁可以通过控制加锁的顺序来避免。有两个mutex A和B,如果所有的线程总是先对A加锁再对B加锁就不会产生死锁。但实际应用中可能很难保证这种顺序加锁的方式,这种情况下,可以使用pthread_mutex_trylock来避免死锁的发生。

9.读写锁的使用方法。

读写锁的初始化与销毁:

以下是代码片段:
#include  
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
Both return: 0 if OK, error number on failure

对于读写锁的初始化与销毁独占锁类似。

加锁与解锁:

#include int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);All return: 0 if OK, error number on failure

对于读者的数量会有限制,因此调用 pthread_rwlock_rdlock时需要检查返回值。

在正确使用的情况下,不需要检查pthread_rwlock_wrlock和pthread_rwlock_unlock的返回值。

条件加锁:

#include int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);Both return: 0 if OK, error number on failure

10.什么是条件变量,它有什么作用?

条件变量是线程可用的另外一种同步机制。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定条件的发生。条件本身是由互斥量保护的。线程在改变状态前必须首先锁住互斥量,其它线程在获得互斥量之前不会觉察到这种变化。

11.如何使用条件变量?

条件变量的类型为pthread_cond_t ,其初始化与销毁的方式与mutex类似,注意静态变量可以通过指定常量PTHREAD_COND_INITIALIZER来进行初始化。

#include int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);int pthread_cond_destroy(pthread_cond_t *cond);Both return: 0 if OK, error number on failure

使用pthread_cond_wait来等待条件变成真。

函数定义如下:

以下是代码片段:
#include  
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,  pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout);

Both return: 0 if OK, error number on failure

调用者把锁住的mutex传递给pthread_cond_wait,函数把调用线程放到等待条件变量的线程列表上,然后对互斥量解锁,这两个操作是原子操作。这样就关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间窗口。pthread_cond_wait返回时,mutex会再次被锁住。

注意,调用成功返回后,线程需要重新计算条件变量,因为其它线程可能已经改变了条件。

有两个函数用于通知线程一个条件已经被满足。pthread_cond_signal函数用来唤醒一个等待条件满足的线程, pthread_cond_broadcast用来唤醒所有等待条件满足的线程。

他们的定义为:

#include int pthread_cond_signal(pthread_cond_t *cond);int pthread_cond_broadcast(pthread_cond_t *cond);Both return: 0 if OK, error number on failure

下面的这段代码实现了类似于生产者消费者模型的程序,生产者通过enqueue_msg将消息放入队列,并发送信号通知给消费者线程。消费者线程被唤醒然后处理消息。

#include struct msg {    struct msg *m_next;    /* ... more stuff here ... */};struct msg *workq;pthread_cond_t qready = PTHREAD_COND_INITIALIZER;pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;voidprocess_msg(void){    struct msg *mp;    for (;;) {        pthread_mutex_lock(&qlock);        while (workq == NULL)            pthread_cond_wait(&qready, &qlock); /*get msg from the queue*/        mp = workq;        workq = mp->m_next;        pthread_mutex_unlock(&qlock);        /* now process the message mp */    }}voidenqueue_msg(struct msg *mp){    pthread_mutex_lock(&qlock);    /*put msg in queue*/    mp->m_next = workq;    workq = mp;    pthread_mutex_unlock(&qlock);    pthread_cond_signal(&qready);}

在pthread_cond_signal发送消息之前并不需要占用锁,因为一旦线程被唤醒后通过while发现没有要处理的msg存在则会再次陷入睡眠。如果系统不能容忍这种竞争环境,则需要在unlock之前调用cond_signal,但是在多处理器机器上,这样会导致多线程被唤醒然后立即进入阻塞(cond_signal唤醒线程,但由于我们仍占用着锁,所以这些线程又会立即阻塞)。

Linux多线程编程详细解析----条件变量 pthread_cond_t - IT-Homer - 博客频道 - CSDN.NET - Google Chrome (2013/9/5 22:01:51)

分类: Linux/Shell2010-11-24 11:30 9642人阅读 评论(0) 收藏 举报

Linux操作系统下的多线程编程详细解析----条件变量

 

1.初始化条件变量pthread_cond_init

#include <pthread.h>

int pthread_cond_init(pthread_cond_t *cv,

const pthread_condattr_t *cattr);

返回值:函数成功返回0;任何其他返回值都表示错误

初始化一个条件变量。当参数cattr为空指针时,函数创建的是一个缺省的条件变量。否则条件变量的属性将由cattr中的属性值来决定。调用 pthread_cond_init函数时,参数cattr为空指针等价于cattr中的属性为缺省属性,只是前者不需要cattr所占用的内存开销。这个函数返回时,条件变量被存放在参数cv指向的内存中。

可以用宏PTHREAD_COND_INITIALIZER来初始化静态定义的条件变量,使其具有缺省属性。这和用pthread_cond_init函数动态分配的效果是一样的。初始化时不进行错误检查。如:

pthread_cond_t cv = PTHREAD_COND_INITIALIZER;

不能由多个线程同时初始化一个条件变量。当需要重新初始化或释放一个条件变量时,应用程序必须保证这个条件变量未被使用。

 

2.阻塞在条件变量上pthread_cond_wait

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cv,

pthread_mutex_t *mutex);

返回值:函数成功返回0;任何其他返回值都表示错误

函数将解锁mutex参数指向的互斥锁,并使当前线程阻塞在cv参数指向的条件变量上。

被阻塞的线程可以被pthread_cond_signal函数,pthread_cond_broadcast函数唤醒,也可能在被信号中断后被唤醒。

pthread_cond_wait函数的返回并不意味着条件的值一定发生了变化,必须重新检查条件的值。

pthread_cond_wait函数返回时,相应的互斥锁将被当前线程锁定,即使是函数出错返回。

一般一个条件表达式都是在一个互斥锁的保护下被检查。当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。当另一个线程改变了条件的值并向条件变量发出信号时,等待在这个条件变量上的一个线程或所有线程被唤醒,接着都试图再次占有相应的互斥锁。

阻塞在条件变量上的线程被唤醒以后,直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。如:

pthread_mutex_lock();

while (condition_is_false)

pthread_cond_wait();

pthread_mutex_unlock();

阻塞在同一个条件变量上的不同线程被释放的次序是不一定的。

注意:pthread_cond_wait()函数是退出点,如果在调用这个函数时,已有一个挂起的退出请求,且线程允许退出,这个线程将被终止并开始执行善后处理函数,而这时和条件变量相关的互斥锁仍将处在锁定状态。

 

3.解除在条件变量上的阻塞pthread_cond_signal

#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cv);

返回值:函数成功返回0;任何其他返回值都表示错误

函数被用来释放被阻塞在指定条件变量上的一个线程。

必须在互斥锁的保护下使用相应的条件变量。否则对条件变量的解锁有可能发生在锁定条件变量之前,从而造成死锁。

唤醒阻塞在条件变量上的所有线程的顺序由调度策略决定,如果线程的调度策略是SCHED_OTHER类型的,系统将根据线程的优先级唤醒线程。

如果没有线程被阻塞在条件变量上,那么调用pthread_cond_signal()将没有作用。

 

4.阻塞直到指定时间pthread_cond_timedwait

#include <pthread.h>

#include <time.h>

int pthread_cond_timedwait(pthread_cond_t *cv,

pthread_mutex_t *mp, const structtimespec * abstime);

返回值:函数成功返回0;任何其他返回值都表示错误

函数到了一定的时间,即使条件未发生也会解除阻塞。这个时间由参数abstime指定。函数返回时,相应的互斥锁往往是锁定的,即使是函数出错返回。

注意:pthread_cond_timedwait函数也是退出点。

超时时间参数是指一天中的某个时刻。使用举例:

pthread_timestruc_t to;

to.tv_sec = time(NULL) + TIMEOUT;

to.tv_nsec = 0;

超时返回的错误码是ETIMEDOUT。

 

5.释放阻塞的所有线程pthread_cond_broadcast

#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cv);

返回值:函数成功返回0;任何其他返回值都表示错误

函数唤醒所有被pthread_cond_wait函数阻塞在某个条件变量上的线程,参数cv被用来指定这个条件变量。当没有线程阻塞在这个条件变量上时,pthread_cond_broadcast函数无效。

由于pthread_cond_broadcast函数唤醒所有阻塞在某个条件变量上的线程,这些线程被唤醒后将再次竞争相应的互斥锁,所以必须小心使用pthread_cond_broadcast函数。

 

6.释放条件变量pthread_cond_destroy

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cv);

返回值:函数成功返回0;任何其他返回值都表示错误

释放条件变量。

注意:条件变量占用的空间并未被释放。

 

7.唤醒丢失问题

在线程未获得相应的互斥锁时调用pthread_cond_signal或pthread_cond_broadcast函数可能会引起唤醒丢失问题。

唤醒丢失往往会在下面的情况下发生:

  1. 一个线程调用pthread_cond_signal或pthread_cond_broadcast函数;
  2. 另一个线程正处在测试条件变量和调用pthread_cond_wait函数之间;
  3. 没有线程正在处在阻塞等待的状态下。

 

转载声明: 本文转自 http://pzs1237.blog.163.com/blog/static/29813006200952335454934/

 

===============================================================================

 

条件锁pthread_cond_t

 

说明,
等待线程
1。使用pthread_cond_wait前要先加锁
2。pthread_cond_wait内部会解锁,然后等待条件变量被其它线程激活
3。pthread_cond_wait被激活后会再自动加锁

激活线程:
1。加锁(和等待线程用同一个锁)
2。pthread_cond_signal发送信号
3。解锁
激活线程的上面三个操作在运行时间上都在等待线程的pthread_cond_wait函数内部。

程序示例:

  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t count_lock;  
  6. pthread_cond_t count_nonzero;  
  7. unsigned count = 0;  
  8.   
  9. void *decrement_count(void *arg)  
  10. {  
  11.     pthread_mutex_lock(&count_lock);  
  12.     printf("decrement_count get count_lock/n");  
  13.     while(count == 0)  
  14.     {  
  15.         printf("decrement_count count == 0 /n");  
  16.         printf("decrement_count before cond_wait /n");  
  17.         pthread_cond_wait(&count_nonzero, &count_lock);  
  18.         printf("decrement_count after cond_wait /n");  
  19.     }  
  20.   
  21.     count = count + 1;  
  22.     pthread_mutex_unlock(&count_lock);  
  23. }  
  24.   
  25. void *increment_count(void *arg)  
  26. {  
  27.     pthread_mutex_lock(&count_lock);  
  28.     printf("increment_count get count_lock /n");  
  29.     if(count == 0)  
  30.     {  
  31.         printf("increment_count before cond_signal /n");  
  32.         pthread_cond_signal(&count_nonzero);  
  33.         printf("increment_count after cond_signal /n");  
  34.     }  
  35.   
  36.     count = count + 1;  
  37.     pthread_mutex_unlock(&count_lock);  
  38. }  
  39.   
  40. int main(void)  
  41. {  
  42.     pthread_t tid1, tid2;  
  43.   
  44.     pthread_mutex_init(&count_lock, NULL);  
  45.     pthread_cond_init(&count_nonzero, NULL);  
  46.   
  47.     pthread_create(&tid1, NULL, decrement_count, NULL);  
  48.     sleep(2);  
  49.     pthread_create(&tid2, NULL, increment_count, NULL);  
  50.   
  51.     sleep(10);  
  52.     pthread_exit(0);  
  53.   
  54.     return 0;  
  55. }  

运行结果:

[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond pthread_cond.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond 
decrement_count get count_lock
decrement_count count == 0 
decrement_count before cond_wait 
increment_count get count_lock 
increment_count before cond_signal 
increment_count after cond_signal 
decrement_count after cond_wait

 

转载声明: 本文转自 http://egeho123.blogbus.com/logs/10821816.html

 

===============================================================================

 

多线程编程,条件变量pthread_cond_t应用

 

程序代码:

 

  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t counter_lock;  
  6. pthread_cond_t counter_nonzero;  
  7. int counter = 0;  
  8. int estatus = -1;  
  9.   
  10. void *decrement_counter(void *argv);  
  11. void *increment_counter(void *argv);  
  12.   
  13. int main(int argc, char **argv)  
  14. {  
  15.     printf("counter: %d/n", counter);  
  16.     pthread_t thd1, thd2;  
  17.     int ret;  
  18.   
  19.     ret = pthread_create(&thd1, NULL, decrement_counter, NULL);  
  20.     if(ret){  
  21.         perror("del:/n");  
  22.         return 1;  
  23.     }  
  24.   
  25.     ret = pthread_create(&thd2, NULL, increment_counter, NULL);  
  26.     if(ret){  
  27.         perror("inc: /n");  
  28.         return 1;  
  29.     }  
  30.   
  31.     int counter = 0;  
  32.     while(counter != 10){  
  33.         printf("counter: %d/n", counter);  
  34.         sleep(1);  
  35.         counter++;  
  36.     }  
  37.   
  38.     return 0;  
  39. }  
  40.   
  41. void *decrement_counter(void *argv)  
  42. {  
  43.     pthread_mutex_lock(&counter_lock);  
  44.     while(counter == 0)  
  45.         pthread_cond_wait(&counter_nonzero, &counter_lock);  
  46.   
  47.     counter--;  
  48.     pthread_mutex_unlock(&counter_lock);  
  49.   
  50.     return &estatus;  
  51. }  
  52.   
  53. void *increment_counter(void *argv)  
  54. {  
  55.     pthread_mutex_lock(&counter_lock);  
  56.     if(counter == 0)  
  57.         pthread_cond_signal(&counter_nonzero);  
  58.   
  59.     counter++;  
  60.     pthread_mutex_unlock(&counter_lock);  
  61.   
  62.     return &estatus;  
  63. }  

 

运行结果:
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond2 pthread_cond2.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond2                               
counter: 0
counter: 0
counter: 1
counter: 2
counter: 3
counter: 4
counter: 5
counter: 6
counter: 7
counter: 8
counter: 9

 

调试程序的运行过程:

1、开始时 counter 为0 (main)
2、ret = pthread_create(&thrd1, NULL, decrement_counter, NULL)处生成一个thrd1线程运行decrement_counter(),
此线程内函数运行流程为:
先锁定 互斥锁(count_lock) 如果counter为0,此线程被阻塞在条件变量(count_nonzero)上.同时释放互斥锁count_lock(wait内部会先释放锁,等待signal激活后自动再加上锁).

 

3、与此同时主程序还在运行,创建另一个线程thrd2运行 increment_counter,
此线程内的函数流程如下:
先锁定 互斥锁(count_lock)【wait内部释放锁的互斥锁】 如果counter为0, 唤醒在条件变量(count_nonzero)上的线程即thrd1.但是由于有互斥锁count_lock【signal激活后,wait内部又自动加上锁了】, thrd1还是在等待. 然后count++,释放互斥锁,.......thrd1由于互斥锁释放,重新判断counter是不是为0,如果为0再把线程阻塞在条件变量count_nonzero上,但这时counter已经为1了.所以线程继续运行.counter--释放互斥锁......(退出后,运行主线程main
4、与此主程序间隔打印counter运行一段时间退出.

 注:更清晰的运行流程请详见如下“改进代码”

后记,在编译的时候加上 -lpthread
改进代码:
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t counter_lock;  
  6. pthread_cond_t counter_nonzero;  
  7. int counter = 0;  
  8. int estatus = -1;  
  9.   
  10. void *decrement_counter(void *argv);  
  11. void *increment_counter(void *argv);  
  12.   
  13. int main(int argc, char **argv)  
  14. {  
  15.     printf("counter: %d/n", counter);  
  16.     pthread_t thd1, thd2;  
  17.     int ret;  
  18.   
  19.     ret = pthread_create(&thd1, NULL, decrement_counter, NULL);  
  20.     if(ret){  
  21.         perror("del:/n");  
  22.         return 1;  
  23.     }  
  24.   
  25.     ret = pthread_create(&thd2, NULL, increment_counter, NULL);  
  26.     if(ret){  
  27.         perror("inc: /n");  
  28.         return 1;  
  29.     }  
  30.   
  31.     int counter = 0;  
  32.     while(counter != 10){  
  33.         printf("counter(main): %d/n", counter);  
  34.         sleep(1);  
  35.         counter++;  
  36.     }  
  37.   
  38.     return 0;  
  39. }  
  40.   
  41. void *decrement_counter(void *argv)  
  42. {  
  43.     printf("counter(decrement): %d/n", counter);  
  44.     pthread_mutex_lock(&counter_lock);  
  45.     while(counter == 0)  
  46.         pthread_cond_wait(&counter_nonzero, &counter_lock); //进入阻塞(wait),等待激活(signal)  
  47.       
  48.     printf("counter--(before): %d/n", counter);      
  49.     counter--; //等待signal激活后再执行  
  50.     printf("counter--(after): %d/n", counter);      
  51.     pthread_mutex_unlock(&counter_lock);   
  52.   
  53.     return &estatus;  
  54. }  
  55.   
  56. void *increment_counter(void *argv)  
  57. {  
  58.     printf("counter(increment): %d/n", counter);  
  59.     pthread_mutex_lock(&counter_lock);  
  60.     if(counter == 0)  
  61.         pthread_cond_signal(&counter_nonzero); //激活(signal)阻塞(wait)的线程(先执行完signal线程,然后再执行wait线程)  
  62.   
  63.     printf("counter++(before): %d/n", counter);      
  64.     counter++;   
  65.     printf("counter++(after): %d/n", counter);      
  66.     pthread_mutex_unlock(&counter_lock);  
  67.   
  68.     return &estatus;  
  69. }  
运行结果:
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond2 pthread_cond2.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond2                               
counter: 0
counter(main): 0
counter(decrement): 0
counter(increment): 0
counter++(before): 0
counter++(after): 1
counter--(before): 1
counter--(after): 0
counter(main): 1
counter(main): 2
counter(main): 3
counter(main): 4
counter(main): 5
counter(main): 6
counter(main): 7
counter(main): 8
counter(main): 9

多线程--条件变量 - DZQABC - 博客园 - Google Chrome (2013/9/5 22:00:10)

条件变量函数

 

操作

相关函数说明

初始化条件变量

pthread_cond_init 语法

基于条件变量阻塞

pthread_cond_wait 语法

解除阻塞特定线程

pthread_cond_signal 语法

在指定的时间之前阻塞

pthread_cond_timedwait 语法

在指定的时间间隔内阻塞

pthread_cond_reltimedwait_np 语法

解除阻塞所有线程

pthread_cond_broadcast 语法

销毁条件变量状态

pthread_cond_destroy 语法

初始化条件变量

使用 pthread_cond_init(3C) 可以将 cv 所指示的条件变量初始化为其缺省值,或者指定已经使用 pthread_condattr_init() 设置的条件变量属性。

pthread_cond_init 语法

int pthread_cond_init(pthread_cond_t *cv,

const pthread_condattr_t *cattr);
#include <pthread.h>

pthread_cond_t cv;

pthread_condattr_t cattr;

int ret;

/* initialize a condition variable to its default value */

ret = pthread_cond_init(&cv, NULL);

/* initialize a condition variable */

ret = pthread_cond_init(&cv, &cattr);

cattr 设置为 NULL。将 cattr 设置为 NULL 与传递缺省条件变量属性对象的地址等效,但是没有内存开销。对于 Solaris 线程,请参见cond_init 语法。

使用 PTHREAD_COND_INITIALIZER 宏可以将以静态方式定义的条件变量初始化为其缺省属性。PTHREAD_COND_INITIALIZER 宏与动态分配具有 null 属性的 pthread_cond_init() 等效,但是不进行错误检查。

多个线程决不能同时初始化或重新初始化同一个条件变量。如果要重新初始化或销毁某个条件变量,则应用程序必须确保该条件变量未被使用。

pthread_cond_init 返回值

pthread_cond_init() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。

EINVAL

描述:

cattr 指定的值无效。

EBUSY

描述:

条件变量处于使用状态。

EAGAIN

描述:

必要的资源不可用。

ENOMEM

描述:

内存不足,无法初始化条件变量。

基于条件变量阻塞

使用 pthread_cond_wait(3C) 可以以原子方式释放 mp 所指向的互斥锁,并导致调用线程基于 cv 所指向的条件变量阻塞。对于 Solaris 线程,请参见cond_wait 语法。

pthread_cond_wait 语法

int pthread_cond_wait(pthread_cond_t *cv,pthread_mutex_t *mutex);
#include <pthread.h>

pthread_cond_t cv;

pthread_mutex_t mp;

int ret;

/* wait on condition variable */

ret = pthread_cond_wait(&cv, &mp);

阻塞的线程可以通过 pthread_cond_signal() 或 pthread_cond_broadcast() 唤醒,也可以在信号传送将其中断时唤醒。

不能通过 pthread_cond_wait() 的返回值来推断与条件变量相关联的条件的值的任何变化。必须重新评估此类条件。

pthread_cond_wait() 例程每次返回结果时调用线程都会锁定并且拥有互斥锁,即使返回错误时也是如此。

该条件获得信号之前,该函数一直被阻塞。该函数会在被阻塞之前以原子方式释放相关的互斥锁,并在返回之前以原子方式再次获取该互斥锁。

通常,对条件表达式的评估是在互斥锁的保护下进行的。如果条件表达式为假,线程会基于条件变量阻塞。然后,当该线程更改条件值时,另一个线程会针对条件变量发出信号。这种变化会导致所有等待该条件的线程解除阻塞并尝试再次获取互斥锁。

必须重新测试导致等待的条件,然后才能从 pthread_cond_wait() 处继续执行。唤醒的线程重新获取互斥锁并从 pthread_cond_wait() 返回之前,条件可能会发生变化。等待线程可能并未真正唤醒。建议使用的测试方法是,将条件检查编写为调用 pthread_cond_wait() 的 while() 循环。

pthread_mutex_lock();

while(condition_is_false)

pthread_cond_wait();

pthread_mutex_unlock();

如果有多个线程基于该条件变量阻塞,则无法保证按特定的顺序获取互斥锁。


注 –

pthread_cond_wait() 是取消点。如果取消处于暂挂状态,并且调用线程启用了取消功能,则该线程会终止,并在继续持有该锁的情况下开始执行清除处理程序。


pthread_cond_wait 返回值

pthread_cond_wait() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 或 mp 指定的值无效。

解除阻塞一个线程

对于基于 cv 所指向的条件变量阻塞的线程,使用 pthread_cond_signal(3C) 可以解除阻塞该线程。对于 Solaris 线程,请参见cond_signal 语法。

pthread_cond_signal 语法

int pthread_cond_signal(pthread_cond_t *cv);
#include <pthread.h>

pthread_cond_t cv;

int ret;

/* one condition variable is signaled */

ret = pthread_cond_signal(&cv);

应在互斥锁的保护下修改相关条件,该互斥锁用于获得信号的条件变量中。否则,可能在条件变量的测试和 pthread_cond_wait() 阻塞之间修改该变量,这会导致无限期等待。

调度策略可确定唤醒阻塞线程的顺序。对于 SCHED_OTHER,将按优先级顺序唤醒线程。

如果没有任何线程基于条件变量阻塞,则调用 pthread_cond_signal() 不起作用。


示例 4–8 使用 pthread_cond_wait() 和 pthread_cond_signal()

 

pthread_mutex_t count_lock;

pthread_cond_t count_nonzero;

unsigned count;

decrement_count()

{

pthread_mutex_lock(&count_lock);

while (count == 0)

pthread_cond_wait(&count_nonzero, &count_lock);

count = count - 1;

pthread_mutex_unlock(&count_lock);

}

increment_count()

{

pthread_mutex_lock(&count_lock);

if (count == 0)

pthread_cond_signal(&count_nonzero);

count = count + 1;

pthread_mutex_unlock(&count_lock);

}

pthread_cond_signal 返回值

pthread_cond_signal() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 指向的地址非法。

说明了如何使用 pthread_cond_wait() 和 pthread_cond_signal()

在指定的时间之前阻塞

pthread_cond_timedwait(3C) 的用法与 pthread_cond_wait() 的用法基本相同,区别在于在由 abstime 指定的时间之后 pthread_cond_timedwait() 不再被阻塞。

pthread_cond_timedwait 语法

int pthread_cond_timedwait(pthread_cond_t *cv,

pthread_mutex_t *mp, const struct timespec *abstime);
#include <pthread.h>

#include <time.h>

pthread_cond_t cv;

pthread_mutex_t mp;

timestruct_t abstime;

int ret;

/* wait on condition variable */

ret = pthread_cond_timedwait(&cv, &mp, &abstime);

pthread_cond_timewait() 每次返回时调用线程都会锁定并且拥有互斥锁,即使 pthread_cond_timedwait() 返回错误时也是如此。 对于 Solaris 线程

pthread_cond_timedwait() 函数会一直阻塞,直到该条件获得信号,或者最后一个参数所指定的时间已过为止。


注 –

pthread_cond_timedwait() 也是取消点。



示例 4–9 计时条件等待

 

pthread_timestruc_t to;

pthread_mutex_t m;

pthread_cond_t c;

...

pthread_mutex_lock(&m);

to.tv_sec = time(NULL) + TIMEOUT;

to.tv_nsec = 0;

while (cond == FALSE) {

err = pthread_cond_timedwait(&c, &m, &to);

if (err == ETIMEDOUT) {

/* timeout, do something */

break;

}

}

pthread_mutex_unlock(&m);

pthread_cond_timedwait 返回值

pthread_cond_timedwait() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 或 abstime 指向的地址非法。

ETIMEDOUT

描述:

abstime 指定的时间已过。

超时会指定为当天时间,以便在不重新计算值的情况下高效地重新测试条件,如示例 4–9 中所示。

在指定的时间间隔内阻塞

pthread_cond_reltimedwait_np(3C) 的用法与 pthread_cond_timedwait() 的用法基本相同,唯一的区别在于 pthread_cond_reltimedwait_np() 会采用相对时间间隔而不是将来的绝对时间作为其最后一个参数的值。

pthread_cond_reltimedwait_np 语法

int  pthread_cond_reltimedwait_np(pthread_cond_t *cv, 

pthread_mutex_t *mp,

const struct timespec *reltime);
#include <pthread.h>

#include <time.h>

pthread_cond_t cv;

pthread_mutex_t mp;

timestruct_t reltime;

int ret;

/* wait on condition variable */

ret = pthread_cond_reltimedwait_np(&cv, &mp, &reltime);

pthread_cond_reltimedwait_np() 每次返回时调用线程都会锁定并且拥有互斥锁,即使 pthread_cond_reltimedwait_np() 返回错误时也是如此。对于 Solaris 线程,请参见 cond_reltimedwait(3C)pthread_cond_reltimedwait_np() 函数会一直阻塞,直到该条件获得信号,或者最后一个参数指定的时间间隔已过为止。


注 –

pthread_cond_reltimedwait_np() 也是取消点。


pthread_cond_reltimedwait_np 返回值

pthread_cond_reltimedwait_np() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 或 reltime 指示的地址非法。

ETIMEDOUT

描述:

reltime 指定的时间间隔已过。

解除阻塞所有线程

对于基于 cv 所指向的条件变量阻塞的线程,使用 pthread_cond_broadcast(3C) 可以解除阻塞所有这些线程,这由 pthread_cond_wait() 来指定。

pthread_cond_broadcast 语法

int pthread_cond_broadcast(pthread_cond_t *cv);
#include <pthread.h>

pthread_cond_t cv;

int ret;

/* all condition variables are signaled */

ret = pthread_cond_broadcast(&cv);

如果没有任何线程基于该条件变量阻塞,则调用 pthread_cond_broadcast() 不起作用。对于 Solaris 线程,请参见cond_broadcast 语法。

由于 pthread_cond_broadcast() 会导致所有基于该条件阻塞的线程再次争用互斥锁,因此请谨慎使用 pthread_cond_broadcast()。例如,通过使用pthread_cond_broadcast(),线程可在资源释放后争用不同的资源量,如示例 4–10 中所示。


示例 4–10 条件变量广播

 

pthread_mutex_t rsrc_lock;

pthread_cond_t rsrc_add;

unsigned int resources;

get_resources(int amount)

{

pthread_mutex_lock(&rsrc_lock);

while (resources < amount) {

pthread_cond_wait(&rsrc_add, &rsrc_lock);

}

resources -= amount;

pthread_mutex_unlock(&rsrc_lock);

}

add_resources(int amount)

{

pthread_mutex_lock(&rsrc_lock);

resources += amount;

pthread_cond_broadcast(&rsrc_add);

pthread_mutex_unlock(&rsrc_lock);

}

请注意,在 add_resources() 中,首先更新 resources 还是首先在互斥锁中调用 pthread_cond_broadcast() 无关紧要。

应在互斥锁的保护下修改相关条件,该互斥锁用于获得信号的条件变量中。否则,可能在条件变量的测试和 pthread_cond_wait() 阻塞之间修改该变量,这会导致无限期等待。

pthread_cond_broadcast 返回值

pthread_cond_broadcast() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 指示的地址非法。

销毁条件变量状态

使用 pthread_cond_destroy(3C) 可以销毁与 cv 所指向的条件变量相关联的任何状态。对于 Solaris 线程,请参见cond_destroy 语法。

pthread_cond_destroy 语法

int pthread_cond_destroy(pthread_cond_t *cv);
#include <pthread.h>

pthread_cond_t cv;

int ret;

/* Condition variable is destroyed */

ret = pthread_cond_destroy(&cv);

请注意,没有释放用来存储条件变量的空间。

pthread_cond_destroy 返回值

pthread_cond_destroy() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。

EINVAL

描述:

cv 指定的值无效。

注意:pthread_cond_destroy 销毁一个条件变量,释放它拥有的资源。进入 pthread_cond_destroy 之前,必须没有在该条件变量上等待的线程。
Attempting to destroy a condition variable upon which other threads are currently blocked results in undefined behavior.

唤醒丢失问题

如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。

满足以下所有条件时,即会出现唤醒丢失问题:

  • 一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()

  • 另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()

  • 没有正在等待的线程

    信号不起作用,因此将会丢失

仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。

生成方和使用者问题

并发编程中收集了许多标准的众所周知的问题,生成方和使用者问题只是其中的一个问题。此问题涉及到一个大小限定的缓冲区和两类线程(生成方和使用者),生成方将项放入缓冲区中,然后使用者从缓冲区中取走项。

生成方必须在缓冲区中有可用空间之后才能向其中放置内容。使用者必须在生成方向缓冲区中写入之后才能从中提取内容。

条件变量表示一个等待某个条件获得信号的线程队列。

示例 4–11 中包含两个此类队列。一个队列 (less) 针对生成方,用于等待缓冲区中出现空位置。另一个队列 (more) 针对使用者,用于等待从缓冲槽位的空位置中提取其中包含的信息。该示例中还包含一个互斥锁,因为描述该缓冲区的数据结构一次只能由一个线程访问。


示例 4–11 生成方和使用者的条件变量问题

 

typedef struct {

char buf[BSIZE];

int occupied;

int nextin;

int nextout;

pthread_mutex_t mutex;

pthread_cond_t more;

pthread_cond_t less;

} buffer_t;

buffer_t buffer;

如示例 4–12 中所示,生成方线程获取该互斥锁以保护 buffer 数据结构,然后,缓冲区确定是否有空间可用于存放所生成的项。如果没有可用空间,生成方线程会调用pthread_cond_wait()pthread_cond_wait() 会导致生成方线程连接正在等待 less 条件获得信号的线程队列。less 表示缓冲区中的可用空间。

与此同时,在调用 pthread_cond_wait() 的过程中,该线程会释放互斥锁的锁定。正在等待的生成方线程依赖于使用者线程在条件为真时发出信号,如示例 4–12 中所示。该条件获得信号时,将会唤醒等待 less 的第一个线程。但是,该线程必须再次锁定互斥锁,然后才能从 pthread_cond_wait() 返回。

获取互斥锁可确保该线程再次以独占方式访问缓冲区的数据结构。该线程随后必须检查缓冲区中是否确实存在可用空间。如果空间可用,该线程会向下一个可用的空位置中进行写入。

与此同时,使用者线程可能正在等待项出现在缓冲区中。这些线程正在等待条件变量 more。刚在缓冲区中存储内容的生成方线程会调用 pthread_cond_signal() 以唤醒下一个正在等待的使用者。如果没有正在等待的使用者,此调用将不起作用。

最后,生成方线程会解除锁定互斥锁,从而允许其他线程处理缓冲区的数据结构。


示例 4–12 生成方和使用者问题:生成方

 

void producer(buffer_t *b, char item)

{

pthread_mutex_lock(&b->mutex);



while (b->occupied >= BSIZE)

pthread_cond_wait(&b->less, &b->mutex);

assert(b->occupied < BSIZE);

b->buf[b->nextin++] = item;

b->nextin %= BSIZE;

b->occupied++;

/* now: either b->occupied < BSIZE and b->nextin is the index

of the next empty slot in the buffer, or

b->occupied == BSIZE and b->nextin is the index of the

next (occupied) slot that will be emptied by a consumer

(such as b->nextin == b->nextout) */

pthread_cond_signal(&b->more);

pthread_mutex_unlock(&b->mutex);

}

请注意 assert() 语句的用法。除非在编译代码时定义了 NDEBUG,否则 assert() 在其参数的计算结果为真(非零)时将不执行任何操作。如果参数的计算结果为假(零),则该程序会中止。在多线程程序中,此类断言特别有用。如果断言失败,assert() 会立即指出运行时问题。assert() 还有另一个作用,即提供有用的注释。

以 /* now: either b->occupied ... 开头的注释最好以断言形式表示,但是由于语句过于复杂,无法用布尔值表达式来表示,因此将用英语表示。

断言和注释都是不变量的示例。这些不变量是逻辑语句,在程序正常执行时不应将其声明为假,除非是线程正在修改不变量中提到的一些程序变量时的短暂修改过程中。当然,只要有线程执行语句,断言就应当为真。

使用不变量是一种极为有用的方法。即使没有在程序文本中声明不变量,在分析程序时也应将其视为不变量。

每次线程执行包含注释的代码时,生成方代码中表示为注释的不变量始终为真。如果将此注释移到紧挨 mutex_unlock() 的后面,则注释不一定仍然为真。如果将此注释移到紧跟assert() 之后的位置,则注释仍然为真。

因此,不变量可用于表示一个始终为真的属性,除非一个生成方或一个使用者正在更改缓冲区的状态。线程在互斥锁的保护下处理缓冲区时,该线程可能会暂时声明不变量为假。但是,一旦线程结束对缓冲区的操作,不变量即会恢复为真。

示例 4–13 给出了使用者的代码。该逻辑流程与生成方的逻辑流程相对称。


示例 4–13 生成方和使用者问题:使用者

 

char consumer(buffer_t *b)

{

char item;

pthread_mutex_lock(&b->mutex);

while(b->occupied <= 0)

pthread_cond_wait(&b->more, &b->mutex);

assert(b->occupied > 0);

item = b->buf[b->nextout++];

b->nextout %= BSIZE;

b->occupied--;

/* now: either b->occupied > 0 and b->nextout is the index

of the next occupied slot in the buffer, or

b->occupied == 0 and b->nextout is the index of the next

(empty) slot that will be filled by a producer (such as

b->nextout == b->nextin) */

pthread_cond_signal(&b->less);

pthread_mutex_unlock(&b->mutex);

return(item);

}

线程的分离状态 pthread_attr_setdetachstate 函数使用 - 轻飘飞扬 - 博客频道 - CSDN.NET - Google Chrome (2013/9/5 16:53:00)

分类: 【Linux应用开发】2012-09-27 09:23 800人阅读 评论(0) 收藏 举报

在任何一个时间点上,线程是可结合的(joinable),或者是分离的(detached)。一个可结合的线程能够被其他线程收回其资源和杀死;在被其他线程回收之前,它的存储器资源(如栈)是不释放的。相反,一个分离的线程是不能被其他线程回收或杀死的,它的存储器资源在它终止时由系统自动释放。

        线程的分离状态决定一个线程以什么样的方式来终止自己。在默认情况下线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束。只有当pthread_join()函数返回时,创建的线程才算终止,才能释放自己占用的系统资源。而分离线程不是这样子的,它没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。程序员应该根据自己的需要,选择适当的分离状态。所以如果我们在创建线程时就知道不需要了解线程的终止状态,则可以pthread_attr_t结构中的detachstate线程属性,让线程以分离状态启动。

设置线程分离状态的函数为pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)。第二个参数可选为PTHREAD_CREATE_DETACHED(分离线程)和 PTHREAD _CREATE_JOINABLE(非分离线程)。这里要注意的一点是,如果设置一个线程为分离线程,而这个线程运行又非常快,它很可能在pthread_create函数返回之前就终止了,它终止以后就可能将线程号和系统资源移交给其他的线程使用,这样调用pthread_create的线程就得到了错误的线程号。要避免这种情况可以采取一定的同步措施,最简单的方法之一是可以在被创建的线程里调用pthread_cond_timewait函数,让这个线程等待一会儿,留出足够的时间让函数pthread_create返回。设置一段等待时间,是在多线程编程里常用的方法。但是注意不要使用诸如wait()之类的函数,它们是使整个进程睡眠,并不能解决线程同步的问题。

另外一个可能常用的属性是线程的优先级,它存放在结构sched_param中。用函数pthread_attr_getschedparam和函数pthread_attr_setschedparam进行存放,一般说来,我们总是先取优先级,对取得的值修改后再存放回去。

线程等待——正确处理线程终止

#include <pthread.h>

void pthread_exit(void *retval);

void pthread_join(pthread_t th,void *thread_return);//挂起等待th结束,*thread_return=retval;

int pthread_detach(pthread_t th);

如果线程处于joinable状态,则只能只能被创建他的线程等待终止。

在Linux平台默认情况下,虽然各个线程之间是相互独立的,一个线程的终止不会去通知或影响其他的线程。但是已经终止的线程的资源并不会随着线程的终止而得到释放,我们需要调用 pthread_join() 来获得另一个线程的终止状态并且释放该线程所占的资源。(说明:线程处于joinable状态下)

调用该函数的线程将挂起,等待 th 所表示的线程的结束。 thread_return 是指向线程 th 返回值的指针。需要注意的是 th 所表示的线程必须是 joinable 的,即处于非 detached(游离)状态;并且只可以有唯一的一个线程对 th 调用 pthread_join() 。如果 th 处于 detached 状态,那么对 th 的 pthread_join() 调用将返回错误。

如果不关心一个线程的结束状态,那么也可以将一个线程设置为 detached 状态,从而让操作系统在该线程结束时来回收它所占的资源。将一个线程设置为detached 状态可以通过两种方式来实现。一种是调用 pthread_detach() 函数,可以将线程 th 设置为 detached 状态。另一种方法是在创建线程时就将它设置为 detached 状态,首先初始化一个线程属性变量,然后将其设置为 detached 状态,最后将它作为参数传入线程创建函数 pthread_create(),这样所创建出来的线程就直接处于 detached 状态。

创建 detach 线程:

pthread_t tid;

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_create(&tid, &attr, THREAD_FUNCTION, arg);

总之为了在使用 pthread 时避免线程的资源在线程结束时不能得到正确释放,从而避免产生潜在的内存泄漏问题,在对待线程结束时,要确保该线程处于 detached 状态,否着就需要调用 pthread_join() 函数来对其进行资源回收。

linux c学习笔记----消息队列(ftok,msgget,msgsnd,msgrcv,msgctl) - 知知为知知 - ITeye技术网站 - Google Chrome (2013/7/3 19:18:54)

 

ftok()

#include <sys/types.h>

#include <sys/ipc.h>

函数原型:

 key_t  ftok( const  char * pathname , int   proj_id  );

参数:

  pathname 就时你指定的文件名(该文件必须是存在而且可以访问的),id是子序号,虽         然为int,但是只有8个比特被使用(0-255)。

返回值: 成功时候返回key_t 类型的key值,失败返回-1

msgget

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数原型: int    msgget ( key_t  key , int  msgflg );

函数描述:建立消息队列

参数:

msgget()函数的第一个参数是消息队列对象的关键字(key),函数将它与已有的消息队
列对象的关键字进行比较来判断消息队列对象是否已经创建。而函数进行的具体操作是 由第二个参数,msgflg 控制的。它可以取下面的几个值:
IPC_CREAT :如果消息队列对象不存在,则创建之,否则则进行打开操作;
IPC_EXCL:和IPC_CREAT 一起使用(用”|”连接),如果消息对象不存在则创建之,否     则产生一个错误并返回。

返回值:

成功时返回队列ID,失败返回-1,错误原因存于error中

EEXIST (Queue exists, cannot create)
EIDRM (Queue is marked for deletion)
ENOENT (Queue does not exist)
ENOMEM (Not enough memory to create queue)
ENOSPC (Maximum queue limit exceeded)

msgsnd函数:将消息送入消息队列

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h

函数原型:int  msgsnd ( int msgid ,  struct msgbuf*msgp , int msgsz, int msgflg );

参数说明:

传给msgsnd()函数的第一个参数msqid 是消息队列对象的标识符(由msgget()函数得

到),第二个参数msgp 指向要发送的消息所在的内存,第三个参数msgsz 是要发送信息     的长度(字节数),可以用以下的公式计算:
msgsz = sizeof(struct mymsgbuf) - sizeof(long);
第四个参数是控制函数行为的标志,可以取以下的值:
0,忽略标志位;
IPC_NOWAIT,如果消息队列已满,消息将不被写入队列,控制权返回调用函数的线
程。如果不指定这个参数,线程将被阻塞直到消息被可以被写入。

smgbuf结构体定义如下:

struct smgbuf

{

                     long   mtype;

                    char   mtext [x] ;  //长度由msgsz决定

}

msgflg 可设置为 IPC_NOWAIT 。如果消息队列已满或其他情况无法送入消息,则立即  返回 EAGIN

返回: 0 on success

-1 on error: errno = EAGAIN (queue is full, and IPC_NOWAIT was asserted)
EACCES (permission denied, no write permission)
EFAULT (msgp address isn't accessable – invalid)
EIDRM (The message queue has been removed)
EINTR (Received a signal while waiting to write)
EINVAL (Invalid message queue identifier, nonpositive
message type, or invalid message size)
ENOMEM (Not enough memory to copy message buffer)

msgrcv函数:从消息队列中读取消息

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数定义:int  msgrcv( int  msgid , struct   msgbuf*  msgp ,  int msgsz ,  long msgtyp, int msgflg);

参数:

函数的前三个参数和msgsnd()函数中对应的参数的含义是相同的。第四个参数mtype

指定了函数从队列中所取的消息的类型。函数将从队列中搜索类型与之匹配的消息并将 之返回。不过这里有一个例外。如果mtype 的值是零的话,函数将不做类型检查而自动返     回队列中的最旧的消息。第五个参数依然是是控制函数行为的标志,取值可以是:
0,表示忽略;
IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回调用函数
的进程。如果不指定这个参数,那么进程将被阻塞直到函数可以从队列中得到符合条件 的消息为止。如果一个client 正在等待消息的时候队列被删除,EIDRM 就会被返回。如果     进程在阻塞等待过程中收到了系统的中断信号,EINTR 就会被返回。
MSG_NOERROR,如果函数取得的消息长度大于msgsz,将只返回msgsz 长度的信息,
剩下的部分被丢弃了。如果不指定这个参数,E2BIG 将被返回,而消息则留在队列中不     被取出。
当消息从队列内取出后,相应的消息就从队列中删除了。

msgbuf:结构体,定义如下:

struct msgbuf

{

                      long  mtype ;  //信息种类

                       char   mtest[x];//信息内容   ,长度由msgsz指定

}

msgtyp:  信息类型。 取值如下:

 msgtyp = 0 ,不分类型,直接返回消息队列中的第一项

 msgtyp > 0 ,返回第一项 msgtyp与 msgbuf结构体中的mtype相同的信息

msgtyp <0 , 返回第一项 mtype小于等于msgtyp绝对值的信息

msgflg:取值如下:

IPC_NOWAIT ,不阻塞

IPC_NOERROR ,若信息长度超过参数msgsz,则截断信息而不报错。

返回值:

成功时返回所获取信息的长度,失败返回-1,错误信息存于error

Number of bytes copied into message buffer
-1 on error: errno = E2BIG (Message length is greater than
msgsz,no MSG_NOERROR)
EACCES (No read permission)
EFAULT (Address pointed to by msgp is invalid)
EIDRM (Queue was removed during retrieval)
EINTR (Interrupted by arriving signal)
EINVAL (msgqid invalid, or msgsz less than 0)
ENOMSG (IPC_NOWAIT asserted, and no message exists in the queue to satisfy the request)

 

 

 

 例子:

server.c


  1. <span><strong><em><span style="font-size: small;">#include <stdio.h>  
  2. #include <string.h>  
  3. #include <stdlib.h>  
  4. #include <errno.h>  
  5. #include <unistd.h>  
  6. #include <sys/types.h>  
  7. #include <sys/ipc.h>  
  8. #include <sys/stat.h>  
  9. #include <sys/msg.h>  
  10. #define MSG_FILE "server.c"  
  11. #define BUFFER 255  
  12. #define PERM S_IRUSR|S_IWUSR  
  13. struct msgtype {  
  14. long mtype;  
  15. char buffer[BUFFER+1];  
  16. };  
  17. int main()  
  18. {  
  19. struct msgtype msg;  
  20. key_t key;  
  21. int msgid;  
  22. if((key=ftok(MSG_FILE,'a'))==-1)  
  23. {  
  24. fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));  
  25. exit(1);  
  26. }  
  27. if((msgid=msgget(key,PERM|IPC_CREAT|IPC_EXCL))==-1)  
  28. {  
  29. fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));  
  30. exit(1);  
  31. }  
  32. while(1)  
  33. {  
  34. msgrcv(msgid,&msg,sizeof(struct msgtype),1,0);  
  35. fprintf(stderr,"Server Receive:%s\n",msg.buffer);  
  36. msg.mtype=2;  
  37. msgsnd(msgid,&msg,sizeof(struct msgtype),0);  
  38. }  
  39. exit(0);  
  40. }</span></em></strong></span>  

 client.c


  1. <span><strong><em><span style="font-size: small;">#include <stdio.h>  
  2. #include <string.h>  
  3. #include <stdlib.h>  
  4. #include <errno.h>  
  5. #include <sys/types.h>  
  6. #include <sys/ipc.h>  
  7. #include <sys/msg.h>  
  8. #include <sys/stat.h>  
  9. #define MSG_FILE "server.c"  
  10. #define BUFFER 255  
  11. #define PERM S_IRUSR|S_IWUSR  
  12. struct msgtype {  
  13. long mtype;  
  14. char buffer[BUFFER+1];  
  15. };  
  16. int main(int argc,char **argv)  
  17. {  
  18. struct msgtype msg;  
  19. key_t key;  
  20. int msgid;  
  21. if(argc!=2)  
  22. {  
  23. fprintf(stderr,"Usage:%s string\n\a",argv[0]);  
  24. exit(1);  
  25. }  
  26. if((key=ftok(MSG_FILE,'a'))==-1)  
  27. {  
  28. fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));  
  29. exit(1);  
  30. }  
  31. if((msgid=msgget(key,PERM))==-1)  
  32. {  
  33. fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));  
  34. exit(1);  
  35. }  
  36. msg.mtype=1;  
  37. strncpy(msg.buffer,argv[1],BUFFER);  
  38. msgsnd(msgid,&msg,sizeof(struct msgtype),0);  
  39. memset(&msg,'\0',sizeof(struct msgtype));  
  40. msgrcv(msgid,&msg,sizeof(struct msgtype),2,0);  
  41. fprintf(stderr,"Client receive:%s\n",msg.buffer);  
  42. exit(0);  
  43. }</span></em></strong></span>  

Linux下c开发 之 线程通信(转) - 土猫敢死队 - 博客园 - Google Chrome (2013/7/3 10:10:22)


Linux下c开发 之 线程通信(转)

1.Linux“线程”

     进程与线程之间是有区别的,不过Linux内核只提供了轻量进程的支持,未实现线程模型。Linux是一种“多进程单线程”的操作系统。Linux本身只有进程的概念,而其所谓的“线程”本质上在内核里仍然是进程。

     大家知道,进程是资源分配的单位,同一进程中的多个线程共享该进程的资源(如作为共享内存的全局变量)。Linux中所谓的“线程”只是在被创建时clone了父进程的资源,因此clone出来的进程表现为“线程”,这一点一定要弄清楚。因此,Linux“线程”这个概念只有在打冒号的情况下才是最准确的。

     目前Linux中最流行的线程机制为LinuxThreads,所采用的就是线程-进程“一对一”模型,调度交给核心,而在用户级实现一个包括信号处理在内的线程管理机制。LinuxThreads由Xavier Leroy (Xavier.Leroy@inria.fr)负责开发完成,并已绑定在GLIBC中发行,它实现了一种BiCapitalized面向Linux的Posix 1003.1c “pthread”标准接口。Linuxthread可以支持Intel、Alpha、MIPS等平台上的多处理器系统。

  按照POSIX 1003.1c 标准编写的程序与Linuxthread 库相链接即可支持Linux平台上的多线程,在程序中需包含头文件pthread. h,在编译链接时使用命令:

gcc -D -REENTRANT -lpthread xxx. c

  其中-REENTRANT宏使得相关库函数(如stdio.h、errno.h中函数) 是可重入的、线程安全的(thread-safe),-lpthread则意味着链接库目录下的libpthread.a或libpthread.so文件。使用Linuxthread库需要2.0以上版本的Linux内核及相应版本的C库(libc 5.2.18、libc 5.4.12、libc 6)。

     2.“线程”控制

  线程创建

  进程被创建时,系统会为其创建一个主线程,而要在进程中创建新的线程,则可以调用pthread_create:

pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(start_routine)(void*), void *arg);

  start_routine为新线程的入口函数,arg为传递给start_routine的参数。

  每个线程都有自己的线程ID,以便在进程内区分。线程ID在pthread_create调用时回返给创建线程的调用者;一个线程也可以在创建后使用pthread_self()调用获取自己的线程ID:

pthread_self (void) ;

  线程退出

  线程的退出方式有三:

  (1)执行完成后隐式退出;

  (2)由线程本身显示调用pthread_exit 函数退出;

pthread_exit (void * retval) ;

  (3)被其他线程用pthread_cance函数终止:

pthread_cance (pthread_t thread) ;

  在某线程中调用此函数,可以终止由参数thread 指定的线程。

  如果一个线程要等待另一个线程的终止,可以使用pthread_join函数,该函数的作用是调用pthread_join的线程将被挂起直到线程ID为参数thread的线程终止:

pthread_join (pthread_t thread, void** threadreturn);

3.线程通信

  线程互斥

  互斥意味着“排它”,即两个线程不能同时进入被互斥保护的代码。Linux下可以通过pthread_mutex_t 定义互斥体机制完成多线程的互斥操作,该机制的作用是对某个需要互斥的部分,在进入时先得到互斥体,如果没有得到互斥体,表明互斥部分被其它线程拥有,此时欲获取互斥体的线程阻塞,直到拥有该互斥体的线程完成互斥部分的操作为止。

  下面的代码实现了对共享全局变量x 用互斥体mutex 进行保护的目的:

int x; // 进程中的全局变量
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL); //按缺省的属性初始化互斥体变量mutex
pthread_mutex_lock(&mutex); // 给互斥体变量加锁
… //对变量x 的操作
phtread_mutex_unlock(&mutex); // 给互斥体变量解除锁

  线程同步

  同步就是线程等待某个事件的发生。只有当等待的事件发生线程才继续执行,否则线程挂起并放弃处理器。当多个线程协作时,相互作用的任务必须在一定的条件下同步。

  Linux下的C语言编程有多种线程同步机制,最典型的是条件变量(condition variable)。pthread_cond_init用来创建一个条件变量,其函数原型为:

pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr);

  pthread_cond_wait和pthread_cond_timedwait用来等待条件变量被设置,值得注意的是这两个等待调用需要一个已经上锁的互斥体mutex,这是为了防止在真正进入等待状态之前别的线程有可能设置该条件变量而产生竞争。pthread_cond_wait的函数原型为:

pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);

  pthread_cond_broadcast用于设置条件变量,即使得事件发生,这样等待该事件的线程将不再阻塞:

pthread_cond_broadcast (pthread_cond_t *cond) ;

  pthread_cond_signal则用于解除某一个等待线程的阻塞状态:

pthread_cond_signal (pthread_cond_t *cond) ;

  pthread_cond_destroy 则用于释放一个条件变量的资源。

  在头文件semaphore.h 中定义的信号量则完成了互斥体和条件变量的封装,按照多线程程序设计中访问控制机制,控制对资源的同步访问,提供程序设计人员更方便的调用接口。

sem_init(sem_t *sem, int pshared, unsigned int val);

  这个函数初始化一个信号量sem 的值为val,参数pshared 是共享属性控制,表明是否在进程间共享。

sem_wait(sem_t *sem);

  调用该函数时,若sem为无状态,调用线程阻塞,等待信号量sem值增加(post )成为有信号状态;若sem为有状态,调用线程顺序执行,但信号量的值减一。

sem_post(sem_t *sem);

  调用该函数,信号量sem的值增加,可以从无信号状态变为有信号状态。

     

 

4.实例

  下面我们还是以名的生产者/消费者问题为例来阐述Linux线程的控制和通信。一组生产者线程与一组消费者线程通过缓冲区发生联系。生产者线程将生产的产品送入缓冲区,消费者线程则从中取出产品。缓冲区有N 个,是一个环形的缓冲池。

#include <stdio.h>
#include <pthread.h>
#define BUFFER_SIZE 16 // 缓冲区数量
struct prodcons
{
// 缓冲区相关数据结构
int buffer[BUFFER_SIZE]; /* 实际数据存放的数组*/
pthread_mutex_t lock; /* 互斥体lock 用于对缓冲区的互斥操作 */
int readpos, writepos; /* 读写指针*/
pthread_cond_t notempty; /* 缓冲区非空的条件变量 */
pthread_cond_t notfull; /* 缓冲区未满的条件变量 */
};
/* 初始化缓冲区结构 */
void init(struct prodcons *b)
{
pthread_mutex_init(&b->lock, NULL);
pthread_cond_init(&b->notempty, NULL);
pthread_cond_init(&b->notfull, NULL);
b->readpos = 0;
b->writepos = 0;
}
/* 将产品放入缓冲区,这里是存入一个整数*/
void put(struct prodcons *b, int data)
{
pthread_mutex_lock(&b->lock);
/* 等待缓冲区未满*/
if ((b->writepos + 1) % BUFFER_SIZE == b->readpos)
{
pthread_cond_wait(&b->notfull, &b->lock);
}
/* 写数据,并移动指针 */
b->buffer[b->writepos] = data;
b->writepos++;
if (b->writepos > = BUFFER_SIZE)
b->writepos = 0;
/* 设置缓冲区非空的条件变量*/
pthread_cond_signal(&b->notempty);
pthread_mutex_unlock(&b->lock);

/* 从缓冲区中取出整数*/
int get(struct prodcons *b)
{
int data;
pthread_mutex_lock(&b->lock);
/* 等待缓冲区非空*/
if (b->writepos == b->readpos)
{
pthread_cond_wait(&b->notempty, &b->lock);
}
/* 读数据,移动读指针*/
data = b->buffer[b->readpos];
b->readpos++;
if (b->readpos > = BUFFER_SIZE)
b->readpos = 0;
/* 设置缓冲区未满的条件变量*/
pthread_cond_signal(&b->notfull);
pthread_mutex_unlock(&b->lock);
return data;
}

/* 测试:生产者线程将1 到10000 的整数送入缓冲区,消费者线
程从缓冲区中获取整数,两者都打印信息*/
#define OVER ( - 1)
struct prodcons buffer;
void *producer(void *data)
{
int n;
for (n = 0; n < 10000; n++)
{
printf("%d --->\n", n);
put(&buffer, n);
} put(&buffer, OVER);
return NULL;
}

void *consumer(void *data)
{
int d;
while (1)
{
d = get(&buffer);
if (d == OVER)
break;
printf("--->%d \n", d);
}
return NULL;
}

int main(void)
{
pthread_t th_a, th_b;
void *retval;
init(&buffer);
/* 创建生产者和消费者线程*/
pthread_create(&th_a, NULL, producer, 0);
pthread_create(&th_b, NULL, consumer, 0);
/* 等待两个线程结束*/
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return 0;
}

  5.WIN32、VxWorks、Linux线程类比

  目前为止,笔者已经创作了《基于嵌入式操作系统VxWorks的多任务并发程序设计》(《软件报》2006年5~12期连载)、《深入浅出Win32多线程程序设计》(天极网技术专题)系列,我们来找出这两个系列文章与本文的共通点。

   看待技术问题要瞄准其本质,不管是Linux、VxWorks还是WIN32,其涉及到多线程的部分都是那些内容,无非就是线程控制和线程通信,它们的许多函数只是名称不同,其实质含义是等价的,下面我们来列个三大操作系统共同点详细表单:

事项 WIN32 VxWorks Linux
线程创建 CreateThread taskSpawn pthread_create
线程终止 执行完成后退出;线程自身调用ExitThread函数即终止自己;被其他线程调用函数TerminateThread函数 执行完成后退出;由线程本身调用exit退出;被其他线程调用函数taskDelete终止 执行完成后退出;由线程本身调用pthread_exit 退出;被其他线程调用函数pthread_cance终止
获取线程ID GetCurrentThreadId taskIdSelf pthread_self
创建互斥 CreateMutex semMCreate pthread_mutex_init
获取互斥 WaitForSingleObject、WaitForMultipleObjects semTake pthread_mutex_lock
释放互斥 ReleaseMutex semGive phtread_mutex_unlock
创建信号量 CreateSemaphore semBCreate、semCCreate sem_init
等待信号量 WaitForSingleObject semTake sem_wait
释放信号量 ReleaseSemaphore semGive sem_post

   6.小结

  本章讲述了Linux下多线程的控制及线程间通信编程方法,给出了一个生产者/消费者的实例,并将Linux的多线程与WIN32、VxWorks多线程进行了类比,总结了一般规律。鉴于多线程编程已成为开发并发应用程序的主流方法,学好本章的意义也便不言自明。

Linux线程池(C语言描述) - 互斥量+条件变量同步 - Sean's Blog的日志 - 网易博客 - Google Chrome (2013/7/2 22:05:18)

Linux线程池(C语言描述) - 互斥量+条件变量同步

2012-04-24 18:16:50|  分类: Linux
C
 |  标签:线程池  linux  c  |字号 订阅

创建线程或者进程的开销是很大的,为了防止频繁的创建线程,提高程序的运行效率,往往都会建立一个线程池用于多线程程序的调度

下面的程序就是完整的线程池实现,主要采用互斥量和条件变量实现同步

首先定义头文件threadpool.h

在该文件中定义了线程池的数据结构和所有的函数

#ifndef
THREADPOOL_H_
#define THREADPOOL_H_

#include
<stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

/**
 * 线程体数据结构
 */
typedef struct
runner
{
 void
(*callback)(void* arg); // 回调函数指针
 void* arg;
// 回调函数的参数
 struct runner* next;
} thread_runner;

/**
 * 线程池数据结构
 */
typedef
struct
{
 pthread_mutex_t mutex; //互斥量
 pthread_cond_t cond; // 条件变量
 thread_runner* runner_head; // 线程池中所有等待任务的头指针
 thread_runner* runner_tail; // 线程池所有等待任务的尾指针
 int shutdown; // 线程池是否销毁
 pthread_t* threads; // 所有线程
 int max_thread_size; //线程池中允许的活动线程数目
} thread_pool;

/**
 * 线程体
 */
void run(void *arg);

/**
 * 初始化线程池
 *
 参数:
 *
  pool:指向线程池结构有效地址的动态指针
 *
  max_thread_size:最大的线程数
 */
void threadpool_init(thread_pool* pool, int
max_thread_size);

/**
 * 向线程池加入任务
 *
 参数:
 *
  pool:指向线程池结构有效地址的动态指针
 *
  callback:线程回调函数
 *
  arg:回调函数参数
 */
void threadpool_add_runner(thread_pool* pool, void
(*callback)(void *arg), void *arg);

/**
 * 销毁线程池
 *
 参数:
 *
  ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL
 */
void
threadpool_destroy(thread_pool** ppool);

#endif

然后是函数实现threadpool.h

该文件实现了threadpool.h的函数定义

#include
"threadpool.h"

#define
DEBUG 1

/**
 * 线程体
 */
void run(void *arg)
{
 thread_pool* pool = (thread_pool*) arg;
 while (1)
 {
  //
加锁
  pthread_mutex_lock(&(pool->mutex));
#ifdef DEBUG
  printf("run-> locked\n");
#endif
  //
如果等待队列为0并且线程池未销毁,则处于阻塞状态
  while
(pool->runner_head == NULL && !pool->shutdown)
  {
   pthread_cond_wait(&(pool->cond),
&(pool->mutex));
  }
  //如果线程池已经销毁
  if (pool->shutdown)
  {
   // 解锁
   pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
   printf("run-> unlocked and thread
exit\n");
#endif
   pthread_exit(NULL);
  }
  // 取出链表中的头元素
  thread_runner *runner =
pool->runner_head;
  pool->runner_head = runner->next;
  // 解锁
  pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
  printf("run-> unlocked\n");
#endif
  //
调用回调函数,执行任务
  (runner->callback)(runner->arg);
  free(runner);
  runner = NULL;
#ifdef DEBUG
  printf("run-> runned and free
runner\n");
#endif
 }
 pthread_exit(NULL);
}

/**
 * 初始化线程池
 *
 参数:
 *
  pool:指向线程池结构有效地址的动态指针
 *
  max_thread_size:最大的线程数
 */
void threadpool_init(thread_pool* pool, int
max_thread_size)
{
 // 初始化互斥量
 pthread_mutex_init(&(pool->mutex),
NULL);
 // 初始化条件变量
 pthread_cond_init(&(pool->cond),
NULL);
 pool->runner_head = NULL;
 pool->runner_tail = NULL;
 pool->max_thread_size = max_thread_size;
 pool->shutdown = 0;
 // 创建所有分离态线程
 pool->threads = (pthread_t *)
malloc(max_thread_size * sizeof(pthread_t));
 int i = 0;
 for (i = 0; i < max_thread_size; i++)
 {
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
  pthread_create(&(pool->threads[i]),
&attr, (void*) run, (void*) pool);
 }
#ifdef DEBUG
 printf("threadpool_init-> create %d detached
thread\n", max_thread_size);
#endif
}

/**
 * 向线程池加入任务
 *
 参数:
 *
  pool:指向线程池结构有效地址的动态指针
 *
  callback:线程回调函数
 *
  arg:回调函数参数
 */
void threadpool_add_runner(thread_pool* pool, void
(*callback)(void *arg), void *arg)
{
 // 构造一个新任务
 thread_runner *newrunner = (thread_runner *)
malloc(sizeof(thread_runner));
 newrunner->callback = callback;
 newrunner->arg = arg;
 newrunner->next = NULL;
 // 加锁
 pthread_mutex_lock(&(pool->mutex));
#ifdef DEBUG
 printf("threadpool_add_runner->
locked\n");
#endif
 // 将任务加入到等待队列中
 if (pool->runner_head != NULL)
 {
  pool->runner_tail->next =
newrunner;
  pool->runner_tail =
newrunner;
 }
 else
 {
  pool->runner_head = newrunner;
  pool->runner_tail = newrunner;
 }
 // 解锁
 pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
 printf("threadpool_add_runner->
unlocked\n");
#endif
 // 唤醒一个等待线程
 pthread_cond_signal(&(pool->cond));
#ifdef DEBUG
 printf("threadpool_add_runner-> add a runner
and wakeup a waiting thread\n");
#endif
}

/**
 * 销毁线程池
 *
 参数:
 *
  ppool:指向线程池结构有效地址的动态指针地址(二级指针)
 */
void
threadpool_destroy(thread_pool** ppool)
{
 thread_pool *pool = *ppool;
 // 防止2次销毁
 if
(!pool->shutdown)
 {
  pool->shutdown = 1;
  // 唤醒所有等待线程,线程池要销毁了
  pthread_cond_broadcast(&(pool->cond));
  // 等待所有线程中止
  sleep(1);
#ifdef DEBUG
  printf("threadpool_destroy-> wakeup all
waiting threads\n");
#endif
  // 回收空间
  free(pool->threads);
  // 销毁等待队列
  thread_runner *head = NULL;
  while (pool->runner_head != NULL)
  {
   head = pool->runner_head;
   pool->runner_head =
pool->runner_head->next;
   free(head);
  }

#ifdef
DEBUG
  printf("threadpool_destroy->
all runners freed\n");
#endif
  /*条件变量和互斥量也别忘了销毁*/
  pthread_mutex_destroy(&(pool->mutex));
  pthread_cond_destroy(&(pool->cond));

#ifdef
DEBUG
  printf("threadpool_destroy->
mutex and cond destoryed\n");
#endif
  free(pool);
  (*ppool) = NULL;

#ifdef
DEBUG
  printf("threadpool_destroy->
pool freed\n");
#endif
 }
}

以上就是完整的线城池实现

下面写个测试程序来看看

#include
"threadpool.h"

void
threadrun(void* arg)
{
 int *i = (int *) arg;
 printf("%d\n", *i);
}

int
main(void)
{
 thread_pool *pool =
malloc(sizeof(thread_pool));
 threadpool_init(pool, 2);
 int i;
 int
tmp[3];
 for (i = 0; i < 3; i++)
 {
  tmp[i] = i;
  threadpool_add_runner(pool, threadrun,
&tmp[i]);
 }
 sleep(1);
 threadpool_destroy(&pool);
 printf("main-> %p\n",pool);
 printf("main-> test over\n");
 return 0;
}

该函数创建了1个线程池,包含2个线程,然后往线程池中加入了3个任务

并在最后打印了线程池的指针地址(主要是为了看线程池销毁后的状态)

运行结果如下

笔记整理--Linux多线程

输出的信息主要调试信息,

值得注意的是倒数第二行“main-> (nil)”

说明线程池销毁后,指向线程池的指针 =
NULL(这就是为什么在threadpool_destory函数为什么要用二级指针的原因)


[C++][Thread] 转:线程池原理及创建(C++实现) - 痛但快乐着 - 博客频道 - CSDN.NET - Google Chrome (2013/7/1 10:38:22)

分类: 其他2013-01-27 10:55 360人阅读 评论(0) 收藏 举报

      

        看不懂,先收藏
本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关。另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量。文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单。

为什么需要线程池 
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。 


T1:线程创建时间 
T2:线程执行时间,包括线程的同步等时间 
T3:线程销毁时间 


那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。

除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。

因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。

构建线程池框架 

一般线程池都必须具备下面几个组成部分: 
线程池管理器:用于创建并管理线程池 
工作线程: 线程池中实际执行的线程 
任务接口: 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。 
任务队列:线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。 

我们实现的通用线程池框架由五个重要部分组成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中还包括线程同步使用的类CThreadMutex和CCondition。
CJob是所有的任务的基类,其提供一个接口Run,所有的任务类都必须从该类继承,同时实现Run方法。该方法中实现具体的任务逻辑。 
CThread是Linux中线程的包装,其封装了Linux线程最经常使用的属性和方法,它也是一个抽象类,是所有线程类的基类,具有一个接口Run。 
CWorkerThread是实际被调度和执行的线程类,其从CThread继承而来,实现了CThread中的Run方法。 
CThreadPool是线程池类,其负责保存线程,释放线程以及调度线程。 
CThreadManage是线程池与用户的直接接口,其屏蔽了内部的具体实现。 
CThreadMutex用于线程之间的互斥。 
CCondition则是条件变量的封装,用于线程之间的同步。 
它们的类的继承关系如下图所示: 

线程池的时序很简单,如下图所示。CThreadManage直接跟客户端打交道,其接受需要创建的线程初始个数,并接受客户端提交的任务。这儿的任务是具体的非抽象的任务。CThreadManage的内部实际上调用的都是CThreadPool的相关操作。CThreadPool创建具体的线程,并把客户端提交的任务分发给CWorkerThread,CWorkerThread实际执行具体的任务。

理解系统组件 

下面我们分开来了解系统中的各个组件。 
CThreadManage 
CThreadManage的功能非常简单,其提供最简单的方法,其类定义如下: 
class CThreadManage 

private: 
CThreadPool* m_Pool; 
int m_NumOfThread; 
protected: 
public: 
void SetParallelNum(int num); 
CThreadManage(); 
CThreadManage(int num); 
virtual ~CThreadManage(); 

void Run(CJob* job,void* jobdata); 
void TerminateAll(void); 
}; 
其中m_Pool指向实际的线程池;m_NumOfThread是初始创建时候允许创建的并发的线程个数。另外Run和TerminateAll方法也非常简单,只是简单的调用CThreadPool的一些相关方法而已。其具体的实现如下:
CThreadManage::CThreadManage(){ 
m_NumOfThread = 10; 
m_Pool = new CThreadPool(m_NumOfThread); 

CThreadManage::CThreadManage(int num){ 
m_NumOfThread = num; 
m_Pool = new CThreadPool(m_NumOfThread); 

CThreadManage::~CThreadManage(){ 
if(NULL != m_Pool) 
delete m_Pool; 

void CThreadManage::SetParallelNum(int num){ 
m_NumOfThread = num; 

void CThreadManage::Run(CJob* job,void* jobdata){ 
m_Pool->Run(job,jobdata); 

void CThreadManage::TerminateAll(void){ 
m_Pool->TerminateAll(); 

CThread 
CThread 类实现了对Linux中线程操作的封装,它是所有线程的基类,也是一个抽象类,提供了一个抽象接口Run,所有的CThread都必须实现该Run方法。CThread的定义如下所示:
class CThread 

private: 
int m_ErrCode; 
Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize
unsigned long m_ThreadID; 
bool m_Detach; //The thread is detached 
bool m_CreateSuspended; //if suspend after creating 
char* m_ThreadName; 
ThreadState m_ThreadState; //the state of the thread 
protected: 
void SetErrcode(int errcode){m_ErrCode = errcode;} 
static void* ThreadFunction(void*); 
public: 
CThread(); 
CThread(bool createsuspended,bool detach); 
virtual ~CThread(); 
virtual void Run(void) = 0; 
void SetThreadState(ThreadState state){m_ThreadState = state;} 

bool Terminate(void); //Terminate the threa 
bool Start(void); //Start to execute the thread 
void Exit(void); 
bool Wakeup(void); 

ThreadState GetThreadState(void){return m_ThreadState;} 
int GetLastError(void){return m_ErrCode;} 
void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} 
char* GetThreadName(void){return m_ThreadName;} 
int GetThreadID(void){return m_ThreadID;} 

bool SetPriority(int priority); 
int GetPriority(void); 
int GetConcurrency(void); 
void SetConcurrency(int num); 
bool Detach(void); 
bool Join(void); 
bool Yield(void); 
int Self(void); 
}; 
线程的状态可以分为四种,空闲、忙碌、挂起、终止(包括正常退出和非正常退出)。由于目前Linux线程库不支持挂起操作,因此,我们的此处的挂起操作类似于暂停。如果线程创建后不想立即执行任务,那么我们可以将其“暂停”,如果需要运行,则唤醒。有一点必须注意的是,一旦线程开始执行任务,将不能被挂起,其将一直执行任务至完毕。
线程类的相关操作均十分简单。线程的执行入口是从Start()函数开始,其将调用函数ThreadFunction,ThreadFunction再调用实际的Run函数,执行实际的任务。

CThreadPool 
CThreadPool是线程的承载容器,一般可以将其实现为堆栈、单向队列或者双向队列。在我们的系统中我们使用STL Vector对线程进行保存。CThreadPool的实现代码如下:
class CThreadPool 

friend class CWorkerThread; 
private: 
unsigned int m_MaxNum; //the max thread num that can create at the same time
unsigned int m_AvailLow; //The min num of idle thread that shoule kept 
unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time
unsigned int m_AvailNum; //the normal thread num of idle num; 
unsigned int m_InitNum; //Normal thread num; 
protected: 
CWorkerThread* GetIdleThread(void); 

void AppendToIdleList(CWorkerThread* jobthread); 
void MoveToBusyList(CWorkerThread* idlethread); 
void MoveToIdleList(CWorkerThread* busythread); 

void DeleteIdleThread(int num); 
void CreateIdleThread(int num); 
public: 
CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock
CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock
CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
CThreadMutex m_VarMutex; 

CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list 
CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list 
CCondition m_IdleJobCond; //m_JobCond is used to sync job list 
CCondition m_MaxNumCond; 

vector<CWorkerThread*> m_ThreadList; 
vector<CWorkerThread*> m_BusyList; //Thread List 
vector<CWorkerThread*> m_IdleList; //Idle List 

CThreadPool(); 
CThreadPool(int initnum); 
virtual ~CThreadPool(); 

void SetMaxNum(int maxnum){m_MaxNum = maxnum;} 
int GetMaxNum(void){return m_MaxNum;} 
void SetAvailLowNum(int minnum){m_AvailLow = minnum;} 
int GetAvailLowNum(void){return m_AvailLow;} 
void SetAvailHighNum(int highnum){m_AvailHigh = highnum;} 
int GetAvailHighNum(void){return m_AvailHigh;} 
int GetActualAvailNum(void){return m_AvailNum;} 
int GetAllNum(void){return m_ThreadList.size();} 
int GetBusyNum(void){return m_BusyList.size();} 
void SetInitNum(int initnum){m_InitNum = initnum;} 
int GetInitNum(void){return m_InitNum;} 

void TerminateAll(void); 
void Run(CJob* job,void* jobdata); 
}; 
CThreadPool::CThreadPool() 

m_MaxNum = 50; 
m_AvailLow = 5; 
m_InitNum=m_AvailNum = 10 ; 
m_AvailHigh = 20; 

m_BusyList.clear(); 
m_IdleList.clear(); 
for(int i=0;i<m_InitNum;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
thr->SetThreadPool(this); 
AppendToIdleList(thr); 
thr->Start(); 



CThreadPool::CThreadPool(int initnum) 

assert(initnum>0 && initnum<=30); 
m_MaxNum = 30; 
m_AvailLow = initnum-10>0?initnum-10:3; 
m_InitNum=m_AvailNum = initnum ; 
m_AvailHigh = initnum+10; 

m_BusyList.clear(); 
m_IdleList.clear(); 
for(int i=0;i<m_InitNum;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
AppendToIdleList(thr); 
thr->SetThreadPool(this); 
thr->Start(); //begin the thread,the thread wait for job 



CThreadPool::~CThreadPool() 

TerminateAll(); 


void CThreadPool::TerminateAll() 

for(int i=0;i < m_ThreadList.size();i++) { 
CWorkerThread* thr = m_ThreadList[i]; 
thr->Join(); 

return; 


CWorkerThread* CThreadPool::GetIdleThread(void) 

while(m_IdleList.size() ==0 ) 
m_IdleCond.Wait(); 

m_IdleMutex.Lock(); 
if(m_IdleList.size() > 0 ) 

CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); 
printf("Get Idle thread %dn",thr->GetThreadID()); 
m_IdleMutex.Unlock(); 
return thr; 

m_IdleMutex.Unlock(); 

return NULL; 


//add an idle thread to idle list 
void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) 

m_IdleMutex.Lock(); 
m_IdleList.push_back(jobthread); 
m_ThreadList.push_back(jobthread); 
m_IdleMutex.Unlock(); 


//move and idle thread to busy thread 
void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) 

m_BusyMutex.Lock(); 
m_BusyList.push_back(idlethread); 
m_AvailNum--; 
m_BusyMutex.Unlock(); 

m_IdleMutex.Lock(); 
vector<CWorkerThread*>::iterator pos; 
pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread); 
if(pos !=m_IdleList.end()) 
m_IdleList.erase(pos); 
m_IdleMutex.Unlock(); 


void CThreadPool::MoveToIdleList(CWorkerThread* busythread) 

m_IdleMutex.Lock(); 
m_IdleList.push_back(busythread); 
m_AvailNum++; 
m_IdleMutex.Unlock(); 

m_BusyMutex.Lock(); 
vector<CWorkerThread*>::iterator pos; 
pos = find(m_BusyList.begin(),m_BusyList.end(),busythread); 
if(pos!=m_BusyList.end()) 
m_BusyList.erase(pos); 
m_BusyMutex.Unlock(); 

m_IdleCond.Signal(); 
m_MaxNumCond.Signal(); 


//create num idle thread and put them to idlelist 
void CThreadPool::CreateIdleThread(int num) 

for(int i=0;i<num;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
thr->SetThreadPool(this); 
AppendToIdleList(thr); 
m_VarMutex.Lock(); 
m_AvailNum++; 
m_VarMutex.Unlock(); 
thr->Start(); //begin the thread,the thread wait for job 



void CThreadPool::DeleteIdleThread(int num) 

printf("Enter into CThreadPool::DeleteIdleThreadn"); 
m_IdleMutex.Lock(); 
printf("Delete Num is %dn",num); 
for(int i=0;i<num;i++){ 
CWorkerThread* thr; 
if(m_IdleList.size() > 0 ){ 
thr = (CWorkerThread*)m_IdleList.front(); 
printf("Get Idle thread %dn",thr->GetThreadID()); 


vector<CWorkerThread*>::iterator pos; 
pos = find(m_IdleList.begin(),m_IdleList.end(),thr); 
if(pos!=m_IdleList.end()) 
m_IdleList.erase(pos); 
m_AvailNum--; 
printf("The idle thread available num:%d n",m_AvailNum); 
printf("The idlelist num:%d n",m_IdleList.size()); 

m_IdleMutex.Unlock(); 

void CThreadPool::Run(CJob* job,void* jobdata) 

assert(job!=NULL); 

//if the busy thread num adds to m_MaxNum,so we should wait 
if(GetBusyNum() == m_MaxNum) 
m_MaxNumCond.Wait(); 

if(m_IdleList.size()<m_AvailLow) 

if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
CreateIdleThread(m_InitNum-m_IdleList.size()); 
else 
CreateIdleThread(m_MaxNum-GetAllNum()); 


CWorkerThread* idlethr = GetIdleThread(); 
if(idlethr !=NULL) 

idlethr->m_WorkMutex.Lock(); 
MoveToBusyList(idlethr); 
idlethr->SetThreadPool(this); 
job->SetWorkThread(idlethr); 
printf("Job is set to thread %d n",idlethr->GetThreadID()); 
idlethr->SetJob(job,jobdata); 


在CThreadPool中存在两个链表,一个是空闲链表,一个是忙碌链表。Idle链表中存放所有的空闲进程,当线程执行任务时候,其状态变为忙碌状态,同时从空闲链表中删除,并移至忙碌链表中。在CThreadPool的构造函数中,我们将执行下面的代码:
for(int i=0;i<m_InitNum;i++) 

CWorkerThread* thr = new CWorkerThread(); 
AppendToIdleList(thr); 
thr->SetThreadPool(this); 
thr->Start(); //begin the thread,the thread wait for job 

在该代码中,我们将创建m_InitNum个线程,创建之后即调用AppendToIdleList放入Idle链表中,由于目前没有任务分发给这些线程,因此线程执行Start后将自己挂起。
事实上,线程池中容纳的线程数目并不是一成不变的,其会根据执行负载进行自动伸缩。为此在CThreadPool中设定四个变量: 
m_InitNum:处世创建时线程池中的线程的个数。 
m_MaxNum:当前线程池中所允许并发存在的线程的最大数目。 
m_AvailLow:当前线程池中所允许存在的空闲线程的最小数目,如果空闲数目低于该值,表明负载可能过重,此时有必要增加空闲线程池的数目。实现中我们总是将线程调整为m_InitNum个。
m_AvailHigh:当前线程池中所允许的空闲的线程的最大数目,如果空闲数目高于该值,表明当前负载可能较轻,此时将删除多余的空闲线程,删除后调整数也为m_InitNum个。
m_AvailNum:目前线程池中实际存在的线程的个数,其值介于m_AvailHigh和m_AvailLow之间。如果线程的个数始终维持在m_AvailLow和m_AvailHigh之间,则线程既不需要创建,也不需要删除,保持平衡状态。因此如何设定m_AvailLow和m_AvailHigh的值,使得线程池最大可能的保持平衡态,是线程池设计必须考虑的问题。
线程池在接受到新的任务之后,线程池首先要检查是否有足够的空闲池可用。检查分为三个步骤: 
(1)检查当前处于忙碌状态的线程是否达到了设定的最大值m_MaxNum,如果达到了,表明目前没有空闲线程可用,而且也不能创建新的线程,因此必须等待直到有线程执行完毕返回到空闲队列中。
(2)如果当前的空闲线程数目小于我们设定的最小的空闲数目m_AvailLow,则我们必须创建新的线程,默认情况下,创建后的线程数目应该为m_InitNum,因此创建的线程数目应该为( 当前空闲线程数与m_InitNum);但是有一种特殊情况必须考虑,就是现有的线程总数加上创建后的线程数可能超过m_MaxNum,因此我们必须对线程的创建区别对待。
if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
CreateIdleThread(m_InitNum-m_IdleList.size()); 
else 
CreateIdleThread(m_MaxNum-GetAllNum()); 
如果创建后总数不超过m_MaxNum,则创建后的线程为m_InitNum;如果超过了,则只创建( m_MaxNum-当前线程总数 )个。 
(3)调用GetIdleThread方法查找空闲线程。如果当前没有空闲线程,则挂起;否则将任务指派给该线程,同时将其移入忙碌队列。 
当线程执行完毕后,其会调用MoveToIdleList方法移入空闲链表中,其中还调用m_IdleCond.Signal()方法,唤醒GetIdleThread()中可能阻塞的线程。

CWorkerThread 
CWorkerThread是CThread的派生类,是事实上的工作线程。在CThreadPool的构造函数中,我们创建了一定数量的CWorkerThread。一旦这些线程创建完毕,我们将调用Start()启动该线程。Start方法最终会调用Run方法。Run方法是个无限循环的过程。在没有接受到实际的任务的时候,m_Job为NULL,此时线程将调用Wait方法进行等待,从而处于挂起状态。一旦线程池将具体的任务分发给该线程,其将被唤醒,从而通知线程从挂起的地方继续执行。CWorkerThread的完整定义如下:
class CWorkerThread:public CThread 

private: 
CThreadPool* m_ThreadPool; 
CJob* m_Job; 
void* m_JobData; 

CThreadMutex m_VarMutex; 
bool m_IsEnd; 
protected: 
public: 
CCondition m_JobCond; 
CThreadMutex m_WorkMutex; 
CWorkerThread(); 
virtual ~CWorkerThread(); 
void Run(); 
void SetJob(CJob* job,void* jobdata); 
CJob* GetJob(void){return m_Job;} 
void SetThreadPool(CThreadPool* thrpool); 
CThreadPool* GetThreadPool(void){return m_ThreadPool;} 
}; 
CWorkerThread::CWorkerThread() 

m_Job = NULL; 
m_JobData = NULL; 
m_ThreadPool = NULL; 
m_IsEnd = false; 

CWorkerThread::~CWorkerThread() 

if(NULL != m_Job) 
delete m_Job; 
if(m_ThreadPool != NULL) 
delete m_ThreadPool; 


void CWorkerThread::Run() 

SetThreadState(THREAD_RUNNING); 
for(;;) 

while(m_Job == NULL) 
m_JobCond.Wait(); 

m_Job->Run(m_JobData); 
m_Job->SetWorkThread(NULL); 
m_Job = NULL; 
m_ThreadPool->MoveToIdleList(this); 
if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum()) 

m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T 
hreadPool->GetInitNum()); 

m_WorkMutex.Unlock(); 


void CWorkerThread::SetJob(CJob* job,void* jobdata) 

m_VarMutex.Lock(); 
m_Job = job; 
m_JobData = jobdata; 
job->SetWorkThread(this); 
m_VarMutex.Unlock(); 
m_JobCond.Signal(); 

void CWorkerThread::SetThreadPool(CThreadPool* thrpool) 

m_VarMutex.Lock(); 
m_ThreadPool = thrpool; 
m_VarMutex.Unlock(); 

当线程执行任务之前首先必须判断空闲线程的数目是否低于m_AvailLow,如果低于,则必须创建足够的空闲线程,使其数目达到m_InitNum个,然后将调用MoveToBusyList()移出空闲队列,移入忙碌队列。当任务执行完毕后,其又调用MoveToIdleList()移出忙碌队列,移入空闲队列,等待新的任务。
除了Run方法之外,CWorkerThread中另外一个重要的方法就是SetJob,该方法将实际的任务赋值给线程。当没有任何执行任务即m_Job为NULL的时候,线程将调用m_JobCond.Wait进行等待。一旦Job被赋值给线程,其将调用m_JobCond.Signal方法唤醒该线程。由于m_JobCond属于线程内部的变量,每个线程都维持一个m_JobCond,只有得到任务的线程才被唤醒,没有得到任务的将继续等待。无论一个线程何时被唤醒,其都将从等待的地方继续执行m_Job->Run(m_JobData),这是线程执行实际任务的地方。
在线程执行给定Job期间,我们必须防止另外一个Job又赋给该线程,因此在赋值之前,通过m_VarMutex进行锁定, Job执行期间,其于的Job将不能关联到该线程;任务执行完毕,我们调用m_VarMutex.Unlock()进行解锁,此时,线程又可以接受新的执行任务。
在线程执行任务结束后返回空闲队列前,我们还需要判断当前空闲队列中的线程是否高于m_AvailHigh个。如果超过m_AvailHigh,则必须从其中删除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())个线程,使线程数目保持在m_InitNum个。

CJob 
CJob类相对简单,其封装了任务的基本的属性和方法,其中最重要的是Run方法,代码如下: 
class CJob 

private: 
int m_JobNo; //The num was assigned to the job 
char* m_JobName; //The job name 
CThread *m_pWorkThread; //The thread associated with the job 
public: 
CJob( void ); 
virtual ~CJob(); 

int GetJobNo(void) const { return m_JobNo; } 
void SetJobNo(int jobno){ m_JobNo = jobno;} 
char* GetJobName(void) const { return m_JobName; } 
void SetJobName(char* jobname); 
CThread *GetWorkThread(void){ return m_pWorkThread; } 
void SetWorkThread ( CThread *pWorkThread ){ 
m_pWorkThread = pWorkThread; 

virtual void Run ( void *ptr ) = 0; 
}; 
CJob::CJob(void) 
:m_pWorkThread(NULL) 
,m_JobNo(0) 
,m_JobName(NULL) 


CJob::~CJob(){ 
if(NULL != m_JobName) 
free(m_JobName); 

void CJob::SetJobName(char* jobname) 

if(NULL !=m_JobName) { 
free(m_JobName); 
m_JobName = NULL; 

if(NULL !=jobname) { 
m_JobName = (char*)malloc(strlen(jobname)+1); 
strcpy(m_JobName,jobname); 


线程池使用示例 
至此我们给出了一个简单的与具体任务无关的线程池框架。使用该框架非常的简单,我们所需要的做的就是派生CJob类,将需要完成的任务实现在Run方法中。然后将该Job交由CThreadManage去执行。下面我们给出一个简单的示例程序
class CXJob:public CJob 

public: 
CXJob(){i=0;} 
~CXJob(){} 
void Run(void* jobdata) { 
printf("The Job comes from CXJOB\n"); 
sleep(2); 

}; 

class CYJob:public CJob 

public: 
CYJob(){i=0;} 
~CYJob(){} 
void Run(void* jobdata) { 
printf("The Job comes from CYJob\n"); 

}; 

main() 

CThreadManage* manage = new CThreadManage(10); 
for(int i=0;i<40;i++) 

CXJob* job = new CXJob(); 
manage->Run(job,NULL); 

sleep(2); 
CYJob* job = new CYJob(); 
manage->Run(job,NULL); 
manage->TerminateAll(); 

CXJob和CYJob都是从Job类继承而来,其都实现了Run接口。CXJob只是简单的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒钟。在主程序中我们初始创建10个工作线程。然后分别执行40次CXJob和一次CYJob。
线程池使用后记 
线程池适合场合 
事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。
总之线程池通常适合下面的几个场合: 
(1) 单位时间内处理任务频繁而且任务处理时间短 
(2) 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。 
(3) 必须经常面对高突发性事件,比如Web服务器,如果有足球转播,则服务器将产生巨大的冲击。此时如果采取传统方法,则必须不停的大量产生线程,销毁线程。此时采用动态线程池可以避免这种情况的发生。

结束语 
本文给出了一个简单的通用的与任务无关的线程池的实现,通过该线程池能够极大的简化Linux下多线程的开发工作。该线程池的进一步完善开发工作还在进行中,希望能够得到你的建议和支持。
参考资料 
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml 
POSIX多线程程序设计,David R.Butenhof 译者:于磊 曾刚,中国电力出版社 
C++面向对象多线程编程,CAMERON HUGHES等著 周良忠译,人民邮电出版社 
Java Pro,结合线程和分析器池,Edy Yu 
关于作者 
张中庆,西安交通大学软件所,在读硕士,目前研究方向为分布式网络与移动中间件,对Linux极其爱好,可以通过flydish1234@sina.com.cn与我联系。

C++实现线程池的经典模型_Linux编程_Linux公社-Linux系统门户网站 - Google Chrome (2013/7/1 10:23:31)

C++实现线程池的经典模型_Linux编程_Linux公社-Linux系统门户网站 - Google Chrome (2013/7/1 10:23:31)

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

下面列出线程的一些重要的函数

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,  
                  void *(*start_routine) (void *), void *arg);  
void pthread_exit(void *retval);  
  
int pthread_mutex_destroy(pthread_mutex_t *mutex);  
int pthread_mutex_init(pthread_mutex_t *restrict mutex,  
                      const pthread_mutexattr_t *restrict attr);  
int pthread_mutex_lock(pthread_mutex_t *mutex);  
int pthread_mutex_trylock(pthread_mutex_t *mutex);  
int pthread_mutex_unlock(pthread_mutex_t *mutex);  
  
int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_init(pthread_cond_t *restrict cond,  
                      const pthread_condattr_t *restrict attr);  
int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_init(pthread_cond_t *restrict cond,  
                      const pthread_condattr_t *restrict attr);  
int pthread_cond_broadcast(pthread_cond_t *cond);  
int pthread_cond_signal(pthread_cond_t *cond); 


C语言实现简单线程池 - Newerth - 博客园 - Google Chrome (2013/7/1 10:39:41)


C语言实现简单线程池

有时我们会需要大量线程来处理一些相互独立的任务,为了避免频繁的申请释放线程所带来的开销,我们可以使用线程池。下面是一个C语言实现的简单的线程池。

头文件:

   1: #ifndef THREAD_POOL_H__
   2: #define THREAD_POOL_H__
   3:  
   4: #include <pthread.h>
   5:  
   6: /* 要执行的任务链表 */
   7: typedef struct tpool_work {
   8:     void*               (*routine)(void*);       /* 任务函数 */
   9:     void                *arg;                    /* 传入任务函数的参数 */
  10:     struct tpool_work   *next;                    
  11: }tpool_work_t;
  12:  
  13: typedef struct tpool {
  14:     int             shutdown;                    /* 线程池是否销毁 */
  15:     int             max_thr_num;                /* 最大线程数 */
  16:     pthread_t       *thr_id;                    /* 线程ID数组 */
  17:     tpool_work_t    *queue_head;                /* 线程链表 */
  18:     pthread_mutex_t queue_lock;                    
  19:     pthread_cond_t  queue_ready;    
  20: }tpool_t;
  21:  
  22: /*
  23:  * @brief     创建线程池 
  24:  * @param     max_thr_num 最大线程数
  25:  * @return     0: 成功 其他: 失败  
  26:  */
  27: int
  28: tpool_create(int max_thr_num);
  29:  
  30: /*
  31:  * @brief     销毁线程池 
  32:  */
  33: void
  34: tpool_destroy();
  35:  
  36: /*
  37:  * @brief     向线程池中添加任务
  38:  * @param    routine 任务函数指针
  39:  * @param     arg 任务函数参数
  40:  * @return     0: 成功 其他:失败 
  41:  */
  42: int
  43: tpool_add_work(void*(*routine)(void*), void *arg);
  44:  
  45: #endif

实现:

   1: #include <unistd.h>
   2: #include <stdlib.h>
   3: #include <errno.h>
   4: #include <string.h>
   5: #include <stdio.h>
   6:  
   7: #include "tpool.h"
   8:  
   9: static tpool_t *tpool = NULL;
  10:  
  11: /* 工作者线程函数, 从任务链表中取出任务并执行 */
  12: static void* 
  13: thread_routine(void *arg)
  14: {
  15:     tpool_work_t *work;
  16:     
  17:     while(1) {
  18:         /* 如果线程池没有被销毁且没有任务要执行,则等待 */
  19:         pthread_mutex_lock(&tpool->queue_lock);
  20:         while(!tpool->queue_head && !tpool->shutdown) {
  21:             pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
  22:         }
  23:         if (tpool->shutdown) {
  24:             pthread_mutex_unlock(&tpool->queue_lock);
  25:             pthread_exit(NULL);
  26:         }
  27:         work = tpool->queue_head;
  28:         tpool->queue_head = tpool->queue_head->next;
  29:         pthread_mutex_unlock(&tpool->queue_lock);
  30:  
  31:         work->routine(work->arg);
  32:         free(work);
  33:     }
  34:     
  35:     return NULL;   
  36: }
  37:  
  38: /*
  39:  * 创建线程池 
  40:  */
  41: int
  42: tpool_create(int max_thr_num)
  43: {
  44:     int i;
  45:  
  46:     tpool = calloc(1, sizeof(tpool_t));
  47:     if (!tpool) {
  48:         printf("%s: calloc failed\n", __FUNCTION__);
  49:         exit(1);
  50:     }
  51:     
  52:     /* 初始化 */
  53:     tpool->max_thr_num = max_thr_num;
  54:     tpool->shutdown = 0;
  55:     tpool->queue_head = NULL;
  56:     if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {
  57:         printf("%s: pthread_mutex_init failed, errno:%d, error:%s\n",
  58:             __FUNCTION__, errno, strerror(errno));
  59:         exit(1);
  60:     }
  61:     if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) {
  62:         printf("%s: pthread_cond_init failed, errno:%d, error:%s\n", 
  63:             __FUNCTION__, errno, strerror(errno));
  64:         exit(1);
  65:     }
  66:     
  67:     /* 创建工作者线程 */
  68:     tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t));
  69:     if (!tpool->thr_id) {
  70:         printf("%s: calloc failed\n", __FUNCTION__);
  71:         exit(1);
  72:     }
  73:     for (i = 0; i < max_thr_num; ++i) {
  74:         if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
  75:             printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, 
  76:                 errno, strerror(errno));
  77:             exit(1);
  78:         }
  79:         
  80:     }    
  81:  
  82:     return 0;
  83: }
  84:  
  85: /* 销毁线程池 */
  86: void
  87: tpool_destroy()
  88: {
  89:     int i;
  90:     tpool_work_t *member;
  91:  
  92:     if (tpool->shutdown) {
  93:         return;
  94:     }
  95:     tpool->shutdown = 1;
  96:  
  97:     /* 通知所有正在等待的线程 */
  98:     pthread_mutex_lock(&tpool->queue_lock);
  99:     pthread_cond_broadcast(&tpool->queue_ready);
 100:     pthread_mutex_unlock(&tpool->queue_lock);
 101:     for (i = 0; i < tpool->max_thr_num; ++i) {
 102:         pthread_join(tpool->thr_id[i], NULL);
 103:     }
 104:     free(tpool->thr_id);
 105:  
 106:     while(tpool->queue_head) {
 107:         member = tpool->queue_head;
 108:         tpool->queue_head = tpool->queue_head->next;
 109:         free(member);
 110:     }
 111:  
 112:     pthread_mutex_destroy(&tpool->queue_lock);    
 113:     pthread_cond_destroy(&tpool->queue_ready);
 114:  
 115:     free(tpool);    
 116: }
 117:  
 118: /* 向线程池添加任务 */
 119: int
 120: tpool_add_work(void*(*routine)(void*), void *arg)
 121: {
 122:     tpool_work_t *work, *member;
 123:     
 124:     if (!routine){
 125:         printf("%s:Invalid argument\n", __FUNCTION__);
 126:         return -1;
 127:     }
 128:     
 129:     work = malloc(sizeof(tpool_work_t));
 130:     if (!work) {
 131:         printf("%s:malloc failed\n", __FUNCTION__);
 132:         return -1;
 133:     }
 134:     work->routine = routine;
 135:     work->arg = arg;
 136:     work->next = NULL;
 137:  
 138:     pthread_mutex_lock(&tpool->queue_lock);    
 139:     member = tpool->queue_head;
 140:     if (!member) {
 141:         tpool->queue_head = work;
 142:     } else {
 143:         while(member->next) {
 144:             member = member->next;
 145:         }
 146:         member->next = work;
 147:     }
 148:     /* 通知工作者线程,有新任务添加 */
 149:     pthread_cond_signal(&tpool->queue_ready);
 150:     pthread_mutex_unlock(&tpool->queue_lock);
 151:  
 152:     return 0;    
 153: }
 154:     
 155:  

测试代码:

   1: #include <unistd.h>
   2: #include <stdio.h>
   3: #include <stdlib.h>
   4: #include "tpool.h"
   5:  
   6: void *func(void *arg)
   7: {
   8:     printf("thread %d\n", (int)arg);
   9:     return NULL;
  10: }
  11:  
  12: int
  13: main(int arg, char **argv)
  14: {
  15:     if (tpool_create(5) != 0) {
  16:         printf("tpool_create failed\n");
  17:         exit(1);
  18:     }
  19:     
  20:     int i;
  21:     for (i = 0; i < 10; ++i) {
  22:         tpool_add_work(func, (void*)i);
  23:     }
  24:     sleep(2);
  25:     tpool_destroy();
  26:     return 0;
  27: }

这个实现是在调用tpool_destroy之后,仅将当前正在执行的任务完成之后就会退出,我们也可以修改代码使得线程池在执行完任务链表中所有任务后再退出。