linux线程池thrmgr源码解析

时间:2021-08-22 21:56:27

linux线程池thrmgr源码解析

1         thrmgr线程池的作用

thrmgr线程池的作用是提高程序的并发处理能力,在多CPU的服务器上运行程序,可以并发执行多个任务。

2         thrmgr线程池的原理

thrmgr并非像常规线程池那样,创建线程池时,创建固定数量的线程,线程一直存在,直到线程池被销毁。Thrmgr创建时只是分配线程池对象的变量,并初始化锁、条件变量等变量,并没有创建线程。当向线程池加入第一个任务时,开始创建第一个线程,处理任务,如果一直有任务,则会一直处理任务,如果加入的任务太多太快,线程池内线程都在忙碌,没有空闲等待线程,而且线程池中存在的线程数量不超过最大值,则会创建新的线程去执行任务。线程数量会增多,等到任务数量减少时,有些线程没有任务了,处于等待状态,而且等待超时,这些等待超时的线程就会自动返回,自动退出线程,线程池内活动线程数量减少。这样线程池内线程的数量会随着任务的数量动态调整。即避免了任务量大时,线程池处理不过来,又避免了任务少时,线程池内部存在大量空闲线程的缺陷。从实现了一种根据任务量动态调整的线程池。

3         线程池基础知识

3.1  linux线程属性pthread_attr_t

typedef struct

{

int                               detachstate;   线程的分离状态

int                               schedpolicy;  线程调度策略

structsched_param              schedparam;  线程的调度参数

int                               inheritsched;  线程的继承性

int                                scope;       线程的作用域

size_t                           guardsize;   线程栈末尾的警戒缓冲区大小

int                                stackaddr_set;

void*                          stackaddr;   线程栈的位置

size_t                           stacksize;    线程栈的大小

}pthread_attr_t;

线程具有属性,用pthread_attr_t表示,在对该结构进行处理之前必须进行初始化,在使用后需要对其去除初始化。我们用pthread_attr_init函数对其初始化,用pthread_attr_destroy对其去除初始化。

名称::

pthread_attr_init/pthread_attr_destroy

功能:

对线程属性初始化/去除初始化

头文件:

#include<pthread.h>

函数原形:

int   pthread_attr_init(pthread_attr_t*attr);

int   pthread_attr_destroy(pthread_attr_t*attr);

参数:

Attr   线程属性变量

返回值:

若成功返回0,若失败返回-1。

3.2  linux线程的分离状态

线程的分离状态决定一个线程以什么样的方式来终止自己。在默认情况下线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束。只有当pthread_join()函数返回时,创建的线程才算终止,才能释放自己占用的系统资源。而分离线程不是这样子的,它没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。线程池中需要的就是分离状态的线程。则可以设置pthread_attr_t结构中的detachstate线程属性为PTHREAD_CREATE_DETACHED,让线程以分离状态启动。

名称::

pthread_attr_getdetachstate/pthread_attr_setdetachstate

功能:

获取/修改线程的分离状态属性

头文件:

#include<pthread.h>

函数原形:

int   pthread_attr_getdetachstate(const pthread_attr_t *attr,int *detachstate);

int   pthread_attr_setdetachstate(pthread_attr_t *attr,intdetachstate);

参数:

Attr   线程属性变量

Detachstate  线程的分离状态属性

返回值:

若成功返回0,若失败返回-1。

3.3  Linux互斥量pthread_mutex_t

pthread_mutex_t在线程池中的作用是为了避免多线程对公共变量同时进行访问。在访问变量前,先锁住,不让别人访问,访问结束后在释放锁,让别人访问。保证原子性操作,避免多个线程同时修改公共变量。pthread_mutex_lock(&threadpool->pool_mutex);上锁。pthread_mutex_unlock(&threadpool->pool_mutex);进行解锁,两个函数要配对使用。

3.4  Linux条件变量pthread_cond_t

条件变量是利用线程间共享的全局变量进行同步的一种机制,通常是要和互斥量pthread_mutex_t配合使用。pthread_cond_t在线程池中的作用是无任务时,让线程处于等待状态,等待条件变量有信号,挂起线程。当有任务时,发出信号,让线程脱离等待状态,开始执行任务。线程等待超时,则自动退出,结束线程。

(1)初始化和反初始化

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

int pthread_cond_destroy(pthread_cond_t *cond);

(2)等待信号函数

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);//一直等待

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);//超时等待

因为采用的是绝对时间,所以先要获取系统时间,在加上超时时间,在传入参数。

timeout.tv_sec = time(NULL) + threadpool->idle_timeout;

timeout.tv_nsec = 0;

pthread_cond_wait函数需要传入mutex变量,是因为在函数内部需要对mutex进行操作,内部流程是:更新条件等待队列->释放锁->等待信号。。。->信号到来->上锁->更新条件等待队列->执行pthread_cond_wait后面的代码。在pthread_cond_wait内部有一个先解锁再加锁的过程,所以pthread_cond_wait要和pthread_mutex_t配合使用,而且,pthread_cond_wait需要在加锁和解锁之间,总的流程就是外部加锁->内部更新条件等待队列->内部释放锁->内部等待信号。。。->信号到来->内部上锁->内部更新条件等待队列->执行pthread_cond_wait后面的代码->外部解锁。

(3)发送信号

pthread_cond_signal();发出一个信号,激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;

pthread_cond_broadcast();广播方式发送信号,所有pthread_cond_wait获取到信号返回,激活所有等待线程。

4         线程池的代码实现

/*
* Copyright (C) 2004 Trog <trog@clamav.net>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/ #ifndef __THRMGR_H__
#define __THRMGR_H__ #include <pthread.h> #ifndef C_WINDOWS
#include <sys/time.h>
#endif typedef struct work_item_tag {
struct work_item_tag *next;
void *data;
struct timeval time_queued;//
} work_item_t; typedef struct work_queue_tag {
work_item_t *head;
work_item_t *tail;
int item_count;
} work_queue_t; typedef enum {
POOL_INVALID,
POOL_VALID,
POOL_EXIT
} pool_state_t; typedef struct threadpool_tag {
pthread_mutex_t pool_mutex;//mutex锁,用于限制同时只有一个线程对公共资源(条件变量,线程池内部变量)访问修改
pthread_cond_t pool_cond;//条件变量,用于多线程间等待任务,有任务的信号控制
pthread_attr_t pool_attr;//线程的属性,主要为了设置分离线程属性,即线程循环退出自动结束线程,释放资源,不用调用函数去释放资源
//具体可参考https://www.cnblogs.com/lidabo/p/5514222.html对属性的解释
pool_state_t state;//线程池的状态
int thr_max;//线程池最大线程数量
int thr_alive;//活着的线程数量,包括正在执行任务的线程和空闲等待线程
int thr_idle;//空闲等待的线程。
int idle_timeout;//线程等待超时时间,超时结束后,将结束本线程 void (*handler)(void *);//任务操作函数,有用户传入函数指针 work_queue_t *queue;//任务队列,以单向链表的方式存储任务
} threadpool_t;
/*
功能:新建线程池
参数:
int max_threads, 最大线程数
int idle_timeout,超时时间
void (*handler)(void *)函数操作句柄
*/
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
/*
功能:销毁线程池
参数:
threadpool_t *threadpool线程池指针
*/
void thrmgr_destroy(threadpool_t *threadpool);
/*
功能:给线程池下发任务
参数:
threadpool_t *threadpool线程池指针
void *user_data 用户要处理的数据
*/
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data); #endif
/*
* Copyright (C) 2004 Trog <trog@clamav.net>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/ #if HAVE_CONFIG_H
#include "clamav-config.h"
#endif #include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <errno.h> #include "shared/output.h" #include "thrmgr.h"
#include "others.h" #define FALSE (0)
#define TRUE (1)
/*新建任务队列,初始化队列参数*/
static work_queue_t *work_queue_new(void)
{
work_queue_t *work_q; work_q = (work_queue_t *) malloc(sizeof(work_queue_t));
if (!work_q) {
return NULL;
} work_q->head = work_q->tail = NULL;
work_q->item_count = ;
return work_q;
}
/*向任务队列中加入任务*/
static int work_queue_add(work_queue_t *work_q, void *data)
{
work_item_t *work_item; if (!work_q) {
return FALSE;
}
//申请任务内存
work_item = (work_item_t *) malloc(sizeof(work_item_t));
if (!work_item) {
return FALSE;
}
//next指针设为空,将用户数据赋值给任务
work_item->next = NULL;
work_item->data = data;
//设置任务接收时间,好像没有什么用
gettimeofday(&(work_item->time_queued), NULL);
//第一次插入任务首尾指针都为空,所以同时指向这个任务
if (work_q->head == NULL) {
work_q->head = work_q->tail = work_item;
work_q->item_count = ;
} else {//以后插入时,将结尾next指针指向插入任务,
//然后将位置指针指向最后插入的任务,
//相当于将任务加入单向链表的末尾,然后在将尾指针指向最后一个
work_q->tail->next = work_item;
work_q->tail = work_item;
work_q->item_count++;//任务数量加1
}
return TRUE;
} static void *work_queue_pop(work_queue_t *work_q)
{
work_item_t *work_item;
void *data;
//头指针为空,无数据返回
if (!work_q || !work_q->head) {
return NULL;
}
//获取链表中第一个任务
work_item = work_q->head;
//获取用户数据
data = work_item->data;
//将头指针向后移动一位
work_q->head = work_item->next;
//如果头指针是空,说明刚刚去除的任务已经是最后一个任务,需要把尾指针也置为空
if (work_q->head == NULL) {
work_q->tail = NULL;
}
//销毁任务框架内容,返回用户数据
free(work_item);
return data;
} void thrmgr_destroy(threadpool_t *threadpool)
{
if (!threadpool || (threadpool->state != POOL_VALID)) {
return;
}
//上锁
if (pthread_mutex_lock(&threadpool->pool_mutex) != ) {
logg("!Mutex lock failed\n");
exit(-);
}
//设置线程池状态为退出
threadpool->state = POOL_EXIT; /* wait for threads to exit */
//线程池中有活的线程,广播信号变量,让所有线程都获取到信号,自动返回结束线程
if (threadpool->thr_alive > ) {
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != ) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
while (threadpool->thr_alive > ) {//等待最后一个线程结束时,即thr_alive==0时,会广播一个信号,告诉所有线程都结束了
if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != ) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
if (pthread_mutex_unlock(&threadpool->pool_mutex) != ) {
logg("!Mutex unlock failed\n");
exit(-);
}
//释放资源
pthread_mutex_destroy(&(threadpool->pool_mutex));
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_attr_destroy(&(threadpool->pool_attr));
free(threadpool->queue);
free(threadpool);
return;
} threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
{
threadpool_t *threadpool;
#if defined(C_BIGSTACK) || defined(C_BSD)
size_t stacksize;
#endif if (max_threads <= ) {
return NULL;
}
//创建线程池对象
threadpool = (threadpool_t *) malloc(sizeof(threadpool_t));
if (!threadpool) {
return NULL;
}
//创建任务队列
threadpool->queue = work_queue_new();
if (!threadpool->queue) {
free(threadpool);
return NULL;
}
//线程池创建只是创建对象,没有启动任何线程,线程是在接收到任务才开始创建线程
threadpool->thr_max = max_threads;
threadpool->thr_alive = ;
threadpool->thr_idle = ;
threadpool->idle_timeout = idle_timeout;
threadpool->handler = handler;
//初始化锁
pthread_mutex_init(&(threadpool->pool_mutex), NULL);
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != ) {
pthread_mutex_destroy(&(threadpool->pool_mutex));
free(threadpool->queue);
free(threadpool);
return NULL;
}
//初始化线程属性对象
if (pthread_attr_init(&(threadpool->pool_attr)) != ) {
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex));
free(threadpool->queue);
free(threadpool);
return NULL;
}
//将线程属性对象参数设置为分离的线程属性(PTHREAD_CREATE_DETACHED),即线程执行到末尾,自动回收资源,不用调用回收函数来回收
if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != ) {
pthread_attr_destroy(&(threadpool->pool_attr));
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex));
free(threadpool->queue);
free(threadpool);
return NULL;
}
//设置线程堆栈大小
#if defined(C_BIGSTACK) || defined(C_BSD)
pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
stacksize = stacksize + * ;
if (stacksize < ) stacksize = ; /* at least 1MB please */
logg("Set stacksize to %u\n", stacksize);
pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
#endif
threadpool->state = POOL_VALID;
//线程池状态设置为可用状态
return threadpool;
} static void *thrmgr_worker(void *arg)
{
threadpool_t *threadpool = (threadpool_t *) arg;
void *job_data;
int retval, must_exit = FALSE;
struct timespec timeout; /* loop looking for work */
for (;;)
{//锁住,要修改公共变量了threadpool,如果这里锁住,pthread_cond_timedwait等待状态,后面的释放锁不执行,加入任务的函数thrmgr_dispatch中pthread_cond_signal信号也是在锁内部,
//不就一直发不出去,那么这边在等,不释放锁,那边在争锁,又发不出去,不是死锁了吗。其实不然,pthread_cond_timedwait内部会先释放锁,然后等待信号,然后thrmgr_dispatch就可以进入锁发送信号了。
//然后这边等到信号后再锁住,修改信号状态值,然后返回函数。pthread_cond_timedwait内部有一个释放锁,再锁住的过程。
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != ) {
/* Fatal error */
logg("!Fatal: mutex lock failed\n");
exit(-);
}
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
timeout.tv_nsec = ;
//等待线程数量加1
threadpool->thr_idle++;
//获取到任务退出循环,否则进入等待
while (((job_data=work_queue_pop(threadpool->queue)) == NULL)&& (threadpool->state != POOL_EXIT))
{
/* Sleep, awaiting wakeup */
//接收到第一个任务时,才会 创建第一个线程,所以第一次会接收到信号并处理,第二次循环如果没有任务,
//则会在这里等待,线程被挂起,超时后往下执行
retval = pthread_cond_timedwait(&(threadpool->pool_cond),&(threadpool->pool_mutex), &timeout);
//如果是超时的,说明空闲超时,需要结束该线程,设置变量,让线程自动结束
if (retval == ETIMEDOUT) {
must_exit = TRUE;
break;//跳出等待
}
}//等待结束,等待线程数量减1
threadpool->thr_idle--;
//如果线程状态为退出,也将must_exit置为true
if (threadpool->state == POOL_EXIT) {
must_exit = TRUE;
}
//threadpool变量修改结束,释放锁
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != ) {
/* Fatal error */
logg("!Fatal: mutex unlock failed\n");
exit(-);
}//如果任务不为空,执行任务
if (job_data) {
threadpool->handler(job_data);
}
else if (must_exit)//如果是线程空闲太久或者线程池状态为退出,则退出线程
{
break;
}
}
//又要操作公共变量threadpool->thr_alive了,锁住,不让别人来打扰
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != ) {
/* Fatal error */
logg("!Fatal: mutex lock failed\n");
exit(-);
}
//线程即将推出,将活着的线程数量减1
threadpool->thr_alive--;
//最后一个线程了,临死前发出最后一个广播信号,告诉destory函数最后一个线程已经阵亡,可以结束了
if (threadpool->thr_alive == ) {
/* signal that all threads are finished */
pthread_cond_broadcast(&threadpool->pool_cond);
}
//修改公共变量结束,释放锁
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != ) {
/* Fatal error */
logg("!Fatal: mutex unlock failed\n");
exit(-);
}
return NULL;
}
/*向线程池下发任务*/
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
{
pthread_t thr_id; if (!threadpool) {
return FALSE;
} /* Lock the threadpool */
//需要访问公共对象任务队列了,开始锁住
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != ) {
logg("!Mutex lock failed\n");
return FALSE;
} if (threadpool->state != POOL_VALID) {
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != ) {
logg("!Mutex unlock failed\n");
return FALSE;
}
return FALSE;
}//向任务队列中加入任务
if (!work_queue_add(threadpool->queue, user_data))
{
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != ) {
logg("!Mutex unlock failed\n");
return FALSE;
}
return FALSE;
}
//空闲等待线程数量为0,要么任务太多,要么第一次添加任务,线程池内线程数量为空
if ((threadpool->thr_idle == ) &&(threadpool->thr_alive < threadpool->thr_max))
{
/* Start a new thread */
//启动线程
if (pthread_create(&thr_id, &(threadpool->pool_attr),thrmgr_worker, threadpool) != )
{
logg("!pthread_create failed\n");
}
else
{
threadpool->thr_alive++;//成功创建,活线程数量加1
}
}
//发送一个信号,告诉空闲等待线程,有任务了
pthread_cond_signal(&(threadpool->pool_cond)); if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != ) {
logg("!Mutex unlock failed\n");
return FALSE;
}
return TRUE;
}

参考文献

(1)https://www.cnblogs.com/secondtonone1/p/5580203.html 条件变量

(2)https://www.cnblogs.com/lidabo/p/5514222.html  线程属性

(3)http://en.verysource.com/clamav_sourcecode_rar-download-121916.html 线程池源码

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

百度云盘下载地址:

链接:https://pan.baidu.com/s/1swkQzCIKI3g3ObcebgpIDg

提取码:mc8l

微信公众号获取最新的软件和视频介绍

QStockView

linux线程池thrmgr源码解析