11.2 Win2008以上的新线程池
(1)传统线程池的优缺点:
①传统Windows线程池调用简单,使用方便(有时只需调用一个API即可)
②这种简单也带来负面问题,如接口过于简单,无法更多去控制线程池的行为。
(2)Windows2008新线程池及API
线程池对象 |
传统API |
Win2008及以上平台新API |
普通任务线程池 |
QueueUserWorkItem |
CreateThreadpoolWork |
TrySubmitThreadpoolWaitCallbacks |
||
SubmitThreadpoolWork |
||
CloseThreadpoolWork |
||
计时器线程池 |
CreateTimerQueue(创建线程池) |
CreateThreadpoolTimer |
CreateTimerQueueTimer(创建计时器) |
SetThreadpoolTimer |
|
ChangeTimerQueueTimer |
IsThreadpoolTimerSet |
|
DeleteTimerQueueTimer |
WaitForThreadpoolTimerCallbacks |
|
DeteTimerQueueEx |
CloseThreadpoolTimer |
|
同步对象等待线程池 |
RegisterWaitForSingleObject |
CreateThreadpoolWait |
UnregisterWaitEx |
SetThreadpoolWait |
|
WaitForThreadpoolWaitCallbacks |
||
CloseThreadpoolWait |
||
完成端口线程池 |
BindIoCompletionCallback |
CreateThreadpoolIo |
StartThreadpoolIo |
||
CancelThreadpoolIo |
||
WaitForThreadpoolIoCallbacks |
||
CloseThreadpoolIo |
(3)新线程池辅助API
功能 |
辅助API |
线程池清理器 |
CreateThreadpoolCleanupGroup |
CloseThreadpoolCleanupGroup |
|
ClosethreadpoolCleanupGroupMembers |
|
线程池控制函数 |
CreateThreadpool |
CloseThreadpool |
|
SetThreadpoolThreadMaximum |
|
SetThreadpoolMinimum |
|
线程池环境设备 |
InitializeThreadpoolEnviroment |
DestroyThreadpoolEnvironment |
|
SetThreadpoolCallbackCleanupGroup |
|
SetThreadCallbackLibrary |
|
SetThreadpoolbackpool |
|
SetThreadpoolCallbackRunsLong |
|
显式设定一个长时间调用的回调线程池函数 |
CallbackMayRunLong |
清理及回调方法 |
DisassociateCurrentThreadFromCallback |
FreeLibraryWhenCallbackReturns |
|
LeaveCriticalSectionWhenCallbackReturns |
|
ReleaseMutexWhenCallbackReturns |
|
ReleaseSemaphoreWhenCallbackReturns |
|
SetEventWhenCallbackReturns |
(3)Win2008新线程池的一般编程模型
11.2.1 以异步方式调用函数
(1)单步使用线程池——TrySubmitThreadpoolCallback函数提交异步函数给线程池
参数 |
描述 |
PTP_SIMPLE_CALLBACK pfnCallback |
回调函数,其原型为: VOID NTAPI SimpleCallback (PTP_CALLBACK_INSTANCE pInstance, //不透明的参数,调用回调函数时,由Windows自动传入,可用于继续传给其他回调终止操作的函数使用,如LeaveCriticalSectionWhenCallbackReturns。。 PVOID pvContext);//其中的pInstanc见“回调函数终止时行为”那部分的内容 |
PVOID pvContext |
回调函数的额外参数。 |
PTP_CALLBACK_ENVIRON pcbe |
回调环境,用来对线程池进行定制的参数。(注意,这个结构体内部与一个线程池关联,该参数为NULL时,会创建默认线程池,否则我们可以用CreateThreadpool来创建一个线程池,并与这个结构体关联起来) |
备注:当为pcbe为NULL时,该函数被调用时系统会在进程中创建一个默认的线程池,并让线程池中的一个线程来调用指定的回调函数。该函数(内部调用PostQueuedCompletionStatus)将一个工作项添加到队列中。 |
(2)两步使用线程池
①CreateThreadpoolWork创建“工作项”。注意与之前所说的那些工作项不同。这里的工作项是个对象,不能简单理解成是一个回调函数。而是关联了回调函数及回调环境的一个对象了!
参数 |
描述 |
PTP_WORK_CALLBACK pfnWorkHandler |
工作项要关联的回调函数,其原型为 VOID CALLBACK WorkCallback( PTP_CALLBACK_INSTNACE,PVOID pvContext,PTP_WORK work); |
PVOID pvContext |
回调函数的额外参数 |
PTP_CALLBACK_ENVIRON pcbe |
回调环境,见《TrySubmitThreadpoolCallback函数》的说明 |
②SubmitThreadpoolWork提交这个工作项给线程池。结束后还可以关闭该工作项。
SubmitThreadpoolWork(PTP_WORK pWork);
【注意】
★WaitForThreadpoolWorkCallbacks(pWork,bCancelPendingCallbacks)取消己提交的工作项或等待工作项处理完成
bCancelPendingCallbacks为FALSE,会等待工作项处理完成,函数再返回。为TRUE时会试图取消pWork这个工作项。
如果用一个PTP_WORK提交了多个工作项,当bCancelPendingCallbacks为FALSE时则会等待所有的己提交的工作项,如果为TRUE只要等待当前正在运行的工作项完成时就返回。
★CloseThreadpoolWork关闭一个工作项
【Batch示例程序】批处理程序
//主程序
/*************************************************************************
Module: Batch.cpp
Notices: Copyright(c) 2008 Jeffrey Ritcher & Christophe Nasarre
*************************************************************************/ #include "..\..\CommonFiles\CmnHdr.h"
#include "resource.h" #include <tchar.h>
#include <strsafe.h> //////////////////////////////////////////////////////////////////////////
//全局变量
HWND g_hDlg = NULL;
PTP_WORK g_pWorkItem = NULL;//工作项对象
volatile LONG g_nCurrentTask = ; //自定义消息
#define WM_APP_COMPLETED (WM_APP + 123) //////////////////////////////////////////////////////////////////////////
void AddMessage(LPCTSTR szMsg){
HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS);
ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg));
} //////////////////////////////////////////////////////////////////////////
void WINAPI TaskHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WORK pWork){ LONG currentTask = InterlockedIncrement(&g_nCurrentTask); TCHAR szMsg[MAX_PATH];
StringCchPrintf(szMsg, _countof(szMsg),
TEXT("线程[%u]:%u号任务开始. "), GetCurrentThreadId(), currentTask);
AddMessage(szMsg); //模拟许多的工作
Sleep(currentTask * ); StringCchPrintf(szMsg, _countof(szMsg),
TEXT("线程[%u]:%u号任务结束. "), GetCurrentThreadId(), currentTask);
AddMessage(szMsg); if (InterlockedDecrement(&g_nCurrentTask)==){
//通知UI线程任务己经完成
PostMessage(g_hDlg, WM_APP_COMPLETED, , (LPARAM)currentTask);
}
} //////////////////////////////////////////////////////////////////////////
void OnStartBatch(){
//禁用“开始”按钮
Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), FALSE); AddMessage(TEXT("----开始新的批处理----")); //使用同一个工作项对象,提交6个任务
for (int i = ; i < ;i++){
SubmitThreadpoolWork(g_pWorkItem);
}
//SubmitThreadpoolWork(g_pWorkItem);
//SubmitThreadpoolWork(g_pWorkItem);
//SubmitThreadpoolWork(g_pWorkItem);
//SubmitThreadpoolWork(g_pWorkItem);
//SubmitThreadpoolWork(g_pWorkItem);
} //////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){
g_hDlg = hwnd;
return TRUE;
} void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){
switch (id)
{
case IDOK:
case IDCANCEL:
EndDialog(hwnd, id);
break; case IDC_BTN_START_BATCH:
OnStartBatch();
break;
}
} //////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
switch (uMsg){
chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand); case WM_APP_COMPLETED:
{
TCHAR szMsg[MAX_PATH + ];
StringCchPrintf(szMsg, _countof(szMsg),
TEXT("----%u号任务是批处理的最后一个任务----"), lParam);
AddMessage(szMsg); //启用“开始”按钮
Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), TRUE);
} break;
}
return FALSE;
} //////////////////////////////////////////////////////////////////////////
int APIENTRY _tWinMain(HINSTANCE hInst, HINSTANCE, LPTSTR pCmdLine, int){ //创建用于供所有任务使用的工作项对象(最后一个参数为NULL,使用进程默认的线程池!)
g_pWorkItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);
if (NULL == g_pWorkItem){
MessageBox(NULL, TEXT("无法创建任务所需的工作项对象"),
TEXT(""), MB_ICONSTOP);
return -;
} //ttoi,将字符转为整型
DialogBoxParam(hInst, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc,_ttoi(pCmdLine)); //关闭工作项对象
CloseThreadpoolWork(g_pWorkItem);
return ;
}
//resource.h
//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 供 11_Batch.rc 使用
//
#define IDD_MAIN 101
#define IDC_LB_STATUS 1001
#define IDC_BTN_START_BATCH 1002 // Next default values for new objects
//
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE 102
#define _APS_NEXT_COMMAND_VALUE 40001
#define _APS_NEXT_CONTROL_VALUE 1003
#define _APS_NEXT_SYMED_VALUE 101
#endif
#endif
//Batch.rc
// Microsoft Visual C++ generated resource script.
//
#include "resource.h" #define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "winres.h" /////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS /////////////////////////////////////////////////////////////////////////////
// 中文(简体,中国) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED #ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
// TEXTINCLUDE
BEGIN
"resource.h\0"
END TEXTINCLUDE
BEGIN
"#include ""winres.h""\r\n"
"\0"
END TEXTINCLUDE
BEGIN
"\r\n"
"\0"
END #endif // APSTUDIO_INVOKED /////////////////////////////////////////////////////////////////////////////
//
// Dialog
// IDD_MAIN DIALOGEX , , ,
STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_POPUP | WS_CAPTION | WS_SYSMENU
CAPTION "利用线程池进行批处理"
FONT , "宋体", , , 0x86
BEGIN
DEFPUSHBUTTON "退出",IDOK,,,,
LISTBOX IDC_LB_STATUS,,,,,LBS_NOINTEGRALHEIGHT | NOT WS_BORDER | WS_VSCROLL | WS_HSCROLL | WS_TABSTOP,WS_EX_STATICEDGE
PUSHBUTTON "开始批处理",IDC_BTN_START_BATCH,,,,
END /////////////////////////////////////////////////////////////////////////////
//
// DESIGNINFO
// #ifdef APSTUDIO_INVOKED
GUIDELINES DESIGNINFO
BEGIN
IDD_MAIN, DIALOG
BEGIN
LEFTMARGIN,
RIGHTMARGIN,
TOPMARGIN,
BOTTOMMARGIN,
END
END
#endif // APSTUDIO_INVOKED #endif // 中文(简体,中国) resources
///////////////////////////////////////////////////////////////////////////// #ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
// /////////////////////////////////////////////////////////////////////////////
#endif // not APSTUDIO_INVOKED
11.2.2 每隔一段时间调用一个函数
(1)CreateThreadpoolTimer函数——在线程池中创建一个定时器对象
参数 |
描述 |
PTP_TIMER_CALLBACK PfnTimerCallback |
回调函数指针,其原型为 VOID CALLBACK TimeoutCallback( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PTP_TIMER pTimer); |
PVOID pvContext |
传给回调函数的额外参数 |
PTP_CALLBACK_ENVIRON pcbe |
回调环境 |
返回值 |
计时器工作项,TP_TIMER对象的指针 |
(2)SetThreadpoolTimer:向线程池注册计时器
参数 |
描述 |
PTP_TIMER pTimer |
要设定的计时器的指针 |
PFILETIME pftDueTime |
第1次调用回调函数的时间。 ①NULL:表示停止调用回调函数。(暂停但不销毁计时器) ②小于0:表示相对时间(单位微秒),即相对于调用SetThreadpoolTimer的时间。 ③-1:表示立即开始。 ④正数:以100ns为单位,从1600年1月1日 开始计算的周期数。 |
DWORD msPeriod |
触发周期(单位微秒)。0表示只触发一次。注意如果回调函数执行的时间太长,而回调函数触发的周期又很短,此时系统会启动多个线程来执行这些回调函数。 |
DWORD msWindowLength |
用来给回调函数的执行增加一些随机性。单位微秒 ①设当前设定的触发时刻为T,则下次触发的时间是[T+msPeriod,T+msPeriod+msWindowLength]之间的任何一个时间值。 ②msWindowLength的另一个作用是将计时器分组。如计时器A会在5-7微秒内被触发,计时器B会在6-8微秒内。因时间有重叠,所以线程池只会唤醒一个线程来处理这两个计时器,在处理完A的回调函数后,该线程不进入睡眠,会直接再调用B的回调函数,以减少用两个线程调用时产生的额外的线程上下文切换的开销。 |
(3)IsThreadpoolTimerSet——确定某个计时器是否己经被设置,即pftDueTime!=NULL
(4)WaitForThreadpoolTimerCallbacks等待一个计时器完成。
(5)CloseThreadpoolTimer释放计时器的内存。
【TimedMessageBox示例程序】
/*-----------------------------------------------------------------------------------------
Module: TimedMsgBox.cpp
Notices:Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre
-----------------------------------------------------------------------------------------*/ #include "../../CommonFiles/CmnHdr.h"
#include <tchar.h>
#include <strsafe.h> //////////////////////////////////////////////////////////////////////////
TCHAR g_szCaption[]; //消息框的标题 int g_nSecLeft = ; //消息框中显示的剩余时间 #define ID_MSGBOX_STATIC_TEXT 0x0000FFFF //对话框中文本框的ID,这是系统默认的ID
//////////////////////////////////////////////////////////////////////////
VOID CALLBACK MsgBoxTimeoutCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PTP_TIMER pTimer){
//注意,因竞争条件,当程序运行到这里时,可能对话框还没有创建!
HWND hwnd = FindWindow(NULL, g_szCaption); if (hwnd !=NULL){
if (g_nSecLeft ==){
//时间结束,强迫对话框退出
EndDialog(hwnd, IDOK);
return;
} //如果对话框存在,则更新剩作时间
TCHAR szMsg[];
StringCchPrintf(szMsg, _countof(szMsg), TEXT("还剩%d秒,按确定将取消"), --g_nSecLeft);
SetDlgItemText(hwnd, ID_MSGBOX_STATIC_TEXT, szMsg);
}
} //////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance, PTSTR lpCmdLine,int nShowCmd){
StringCchPrintf(g_szCaption, _countof(g_szCaption), TEXT("Timed Message Box")); //创建线程池计时器对象
PTP_TIMER lpTimer = CreateThreadpoolTimer(MsgBoxTimeoutCallback, NULL, NULL); if (NULL ==lpTimer){
TCHAR szMsg[MAX_PATH];
StringCchPrintf(szMsg, _countof(szMsg), TEXT("无法创建计时器对象:%u"), GetLastError());
MessageBox(NULL, szMsg, TEXT("错误"), MB_OK | MB_ICONERROR);
return (-);
} //设置定时器1秒后触发,以后每秒触发一次
ULARGE_INTEGER ulRelativeStartTime;
ulRelativeStartTime.QuadPart = (LONGLONG)(-); //单位微秒。转换后为1秒。
FILETIME ftRelativeStartTime;
ftRelativeStartTime.dwHighDateTime = ulRelativeStartTime.HighPart;
ftRelativeStartTime.dwLowDateTime = ulRelativeStartTime.LowPart;
SetThreadpoolTimer(
lpTimer,
&ftRelativeStartTime,
, //每隔一秒触发 ); //显示消息框
MessageBox(NULL, TEXT("还剩10秒,按确定将取消"), g_szCaption, MB_OK); //清除计时器对象
CloseThreadpoolTimer(lpTimer); //判断是用户取消计时或者是超时
MessageBox(NULL, (g_nSecLeft == ) ? TEXT("超时") : TEXT("用户取消"), TEXT("结果"), MB_OK);
return ; }
【NewWorkPool程序】简单的工作项和定时器回调演示
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> //////////////////////////////////////////////////////////////////////////
//简单的工作项函数
VOID CALLBACK MyWorkCallback(PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContext,
PTP_WORK pWork){
BOOL bRet = FALSE;
DWORD dwPriorityOriginal = ; dwPriorityOriginal = GetThreadPriority(GetCurrentThread()); if (THREAD_PRIORITY_ERROR_RETURN == dwPriorityOriginal){
_tprintf(_T("GetThreadPriority失败。错误码:%u\n"), GetLastError());
return;
} bRet = SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); if (FALSE == bRet){
_tprintf(_T("SetThreadPriority失败。错误码:%u\n"), GetLastError());
return;
} _tprintf(_T("[ID:0x%X] MyWorkCallback Runing...\n"), GetCurrentThreadId()); bRet = SetThreadPriority(GetCurrentThread(), dwPriorityOriginal); if (FALSE == bRet){
_tprintf(_T("SetThreadPriority失败。错误码:%u\n"), GetLastError());
return;
}
return;
} //简单的定时回调函数
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContext,
PTP_TIMER pTimer){
_tprintf(_T("[ID:0x%X] MyTimerCallback Runing...\n"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){
_tsetlocale(LC_ALL, _T("chs")); BOOL bRet = FALSE;
PTP_WORK pWork = NULL;
PTP_TIMER pTimer = NULL;
PTP_POOL pPool = NULL;
PTP_WORK_CALLBACK workcallback = MyWorkCallback;
PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
TP_CALLBACK_ENVIRON CallbackEnviron;
PTP_CLEANUP_GROUP cleanupgroup = NULL;
FILETIME ftDueTime;
ULARGE_INTEGER ulDueTime; UINT rollback = ; __try{
//初始化环境块
InitializeThreadpoolEnvironment(&CallbackEnviron); //创建线程池
pPool = CreateThreadpool(NULL); if (NULL == pPool){
_tprintf(_T("创建线程池失败!错误码:%u\n"), GetLastError());
__leave;
} rollback = ;//创建线程池成功! //设置线程数
SetThreadpoolThreadMaximum(pPool, );
bRet = SetThreadpoolThreadMinimum(pPool, ); if (!bRet){
_tprintf(_T("SetThreadpoolThreadMinimum失败,错误码:%u\n"), GetLastError());
__leave;
} //创建资源清理器
cleanupgroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupgroup){
_tprintf(_T("CreateThreadpoolCleanupGroup失败!错误码:%u\n"), GetLastError());
__leave;
} rollback = ; //资源清理器创建成功 //将环境块与线程池关联
SetThreadpoolCallbackPool(&CallbackEnviron, pPool); //将清理器与环境块联联
SetThreadpoolCallbackCleanupGroup(&CallbackEnviron, cleanupgroup,NULL); //创建线程池需要的回调函数,这里是一个普通的工作项
pWork = CreateThreadpoolWork(workcallback, NULL, &CallbackEnviron); if (NULL == pWork){
_tprintf(_T("创建线程池普通工作项失败!错误码:%u\n"), GetLastError());
__leave;
} rollback = ; //创建普通工作项成功
SubmitThreadpoolWork(pWork); //提交工作项 //创建一个定时回调项
pTimer = CreateThreadpoolTimer(timercallback, NULL, &CallbackEnviron);
if (NULL == pTimer){
_tprintf(_T("创建线程池计时器对象失败!错误码:%u\n"), GetLastError());
__leave;
} rollback = ; //计时器对象创建成功 //设定定时回调周期
ulDueTime.QuadPart = (LONGLONG)-( * * * ); //1秒以后触发
ftDueTime.dwHighDateTime = ulDueTime.HighPart;
ftDueTime.dwLowDateTime = ulDueTime.LowPart; SetThreadpoolTimer(pTimer, &ftDueTime, , ); //只调用一次 //主线程进入等待状态或干别的工作
Sleep(); //当所有的线程池回调函数都被执行后,关闭清理器
CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL); //事务标志,回滚到第2步,执行第2步后的销毁工作
rollback = ;
__leave; }
__finally{
switch (rollback)
{
case :
case :
//关闭清理器
CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL);
break; case :
//关闭清理器
CloseThreadpoolCleanupGroup(cleanupgroup);
break; case :
//关闭线程池
CloseThreadpool(pPool);
break; default:
break;
}
} _tsystem(_T("PAUSE")); return ;
}
11.2.3 在内核对象触发时调用一个函数
(1)CreateThreadpoolWait——创建一个线程池等待对象(也是一个工作项,等待项)
参数 |
描述 |
PTP_WAIT_CALLBACK pfnWaitCallback |
回调函数指针,其原型为 VOID CALLBACK WaitCallback( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PTP_WAIT pWait, TP_WAIT_RESULT WaitResult); |
PVOID pvContext |
传给回调函数的额外参数 |
PTP_CALLBACK_ENVIRON pcbe |
回调环境 |
返回值 |
等待对象的指针 |
备注:回调函数的WaitResult表示回调函数被调用的原因: WAIT_OBJECT_0:表示传给SetThreadpoolWait的内核对象在超时之前被触发。 WAIT_TIMEOUT:表示内核对象在超时之前没被触发,回调函数被执行是因为超时 WAIT_ABANDONED_0:表示内核对象一个互斥量,并且互斥量被“遗弃”,触发了回调函数。 |
(2)SetThreadpoolWait——将某个内核对象绑定到线程池
参数 |
描述 |
PTP_WAIT pWaitItem |
传入由CreateThreadWait返回的对象的指针 |
HANDLE hObject |
要绑定的内核对象,当该对象被触发时,会调用线程池中的WaitCallback函数。 |
PFILETIME pftTimeout |
线程池愿意花的最长多少时间来等待内核对象触发。 0:立即返回,负值为相对时间,正值为绝对时间,NULL表示无限等待。(线程池内部调用了WaitForMultipleObjects) |
备注:①线程池内部让一个线程调用WaitForMultipleObjects并传入由SetThreadpoolWait函数注册的句柄,不断地组成一个句柄组,同时将Wait*函数的bWaitAll设为FALSE,这样当任何一个句柄被触发,线程池就会被唤醒。 ②因WaitForMultipleObjects不允许将同一个句柄传入多次,因此必须确保不会用SetThreadpoolWait来多次注册同一个句柄,但可以调用DuplicationHandle复制句柄并传给Set*函数。 ③因WaitForMultipleObjects一次最多只能等待64个内核对象,因此线程池实际上为每64个内核对象分配一个线程来等待,所以效率比较高。因此,如果要等待超过64个以上的内核对象,可以考虑用这种线程池,因为系统会每64个内核对象,就开辟一个线程来等待这些内核对象。 ④一旦线程池中一个线程调用了我们的回调函数,对应的等待项将进入“不活跃”状态。这意味着如果在同一个内核对象被触发时再次调用这个回调函数时,需要调用SetThreadpoolWait再次注册。如果传入的hObject为NULL,将把pWaitItem这个等待项从线程中移除。 |
(3)WaitForThreadpoolWaitCallbacks:等待一个等待项完成
(4)ClosethreadpoolWait函数:释放一个等待项的内存
【NewWaitCallback程序】演示触发内核对象时,调用一个函数
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> //////////////////////////////////////////////////////////////////////////
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContext,
PTP_WAIT pWait,
TP_WAIT_RESULT WaitResult){
_tprintf(_T("线程[ID:0x%X] MyWaitCallback Runing...\n"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); PTP_WAIT pWait = NULL;
PTP_WAIT_CALLBACK pfnWaitCallback = MyWaitCallback;
HANDLE hEvent = NULL;
UINT rollback = ; //创建一个事件对象
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自动重置
if (NULL == hEvent)
return ; rollback = ; //创建事件对象成功 __try{
//创建等待线程池
pWait = CreateThreadpoolWait(pfnWaitCallback, NULL, NULL);//利用系统默认线程池
if (NULL == pWait){
_tprintf(_T("CreateThreadpoolWait失败。错误码:%u\n"), GetLastError());
__leave;
} rollback = ; //模拟等待5次,注意每次等待前要调用SetThreadpoolWait方法
for (int i = ; i < ;i++){
SetThreadpoolWait(pWait, hEvent, NULL); //这句很重要
SetEvent(hEvent);
Sleep(); //主线程等待回调线程池调用完毕
WaitForThreadpoolWaitCallbacks(pWait, TRUE);
}
}__finally{
switch (rollback)
{
case :
SetThreadpoolWait(pWait, NULL, NULL);//取消等待项
CloseThreadpoolWait(pWait);
break;
case :
CloseHandle(hEvent);
break;
default:
break;
}
} _tsystem(_T("PAUSE"));
return ;
}
11.2.4 在异步I/O请求完成时调用一个函数
(1)CreateThreadpoolIo:创建线程池Io对象
参数 |
描述 |
HANDLE hDevice |
要关联的设备句柄 |
PTP_WIN32_IO_CALLBACK pfnIoCallback |
回调函数指针,其原型为 VOID CALLBACK OverlappedCompletionRoutine( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PVOID pOverlapped, ULONG IoResult,//操作结果,成功时为NO_ERROR ULONG_PTR NumberOfBytesTransferred,//已传输字节数 PTP_IO //指向线程池中的I/O项的指针, //即CreateThreadpoolIo的返回值 ); |
PVOID pvContext |
传给回调函数的额外参数 |
PTP_CALLBACK_ENVIRON pcbe |
回调环境 |
返回值 |
I/O对象(一个工作项)的指针 |
(2)将IO对象(工作项)与线程池内部的I/O完成端口关联
VOID StartThreadpoolIo(PTP_IO pio);
【注意】每次调用ReadFile和Writefile之前都必调用StartThreadpoolIo,否则回调函数不会被调用(这步相当于给完成端口增加IO完成项通知)
(3)停止线程池调用回调函数:VOID CancelThreadpoolIo(PTP_IO pio);
【注意】①当发出IO请求之后,可以用这来取消。
②在调用ReadFile或WriteFile失败时,仍然必须调用CancelThreadpoolIo(除了返回FALSE且GetLastError为ERROR_IO_PENDING,因这表示正在完成)
(4)等待一处待处理的IO请求完成。
WaitForThreadIoCallbacks(pio,bCancelPendingCallbacks);
【注意】
①该函数须在另一个线程使用,而不能在回调函数内部使用,因为这会造成死锁。
②如果bCancelPendingCallbacks为TRUE,那么当请求完成的时候,回调函数不会被调用(如果尚未被调用)。这和调用CancelThreadpoolIo函数的时候很相似。
(5)解除IO对象(工作项)与线程池的关联:VOID CloseThreadpoolIo(PTP_IO pio);
【NewIOCPPool程序】模拟写入日志文件
效果图与【IOCPPool程序】一致。
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> //////////////////////////////////////////////////////////////////////////
#define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz) //QMLX:浅墨浓香
#define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;} #define QMLX_ASSERT(s) if(!(s)){DebugBreak();}
#define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\
(LPTHREAD_START_ROUTINE)Fun,Param,,NULL); //////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 20 //每个线程最大写入次数
#define MAXWRITETHREAD 10 //写入线程的数量 #define OP_READ 0x01 //读操作
#define OP_WRITE 0x02 //写操作 //单IO数据
typedef struct _tagPerIoData{
OVERLAPPED m_ol;
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA,*PPER_IO_DATA; //IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PVOID pOverlapped,
ULONG IoResult,ULONG_PTR NumberOfBytesTransferred,PTP_IO pio); //写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam); //当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = { }; //IOCP线程池
PTP_IO g_pThreadpoolIo = NULL; //////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID GetAppPath(LPTSTR pszBuffer){
DWORD dwLen = ;
if ( == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return; for (DWORD i = dwLen; i > ;i--){
if ('\\' == pszBuffer[i]){
pszBuffer[i + ] = '\0';
break;
}
}
} int _tmain(){
_tsetlocale(LC_ALL, _T("chs"));
TCHAR pFileName[MAX_PATH] = {};
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt")); HANDLE ahWThread[MAXWRITETHREAD] = {};
DWORD dwWrited = ; //创建文件
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, , NULL,
CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hTxtFile){
_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
_tsystem(_T("PAUSE"));
return ;
} //初始化线程池回调环境
TP_CALLBACK_ENVIRON poolEnv = {};
InitializeThreadpoolEnvironment(&poolEnv); //创建IOCP线程池
g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback,hTxtFile,&poolEnv); //启动IOCP线程池
StartThreadpoolIo(g_pThreadpoolIo); //写入UNICODE文件的前缀码,以便正确打开
PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL); pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
QMLX_ASSERT(pIo->m_pData != NULL);
*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
pIo->m_nLen = sizeof(WORD); //偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen;
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳 WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite,(LPOVERLAPPED)&pIo->m_ol); //等待IOCP线程池完成操作
WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE); //启动写入线程进行日志写入操作
for (int i = ; i < MAXWRITETHREAD;i++){
ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
} //让主线程等待这些写入线程结束
WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE); for (int i = ; i < MAXWRITETHREAD;i++){
CloseHandle(ahWThread[i]);
} //关闭IOCP线程池
CloseThreadpoolIo(g_pThreadpoolIo); //关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile){
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
} _tsystem(_T("PAUSE")); return ;
} //IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
if (NO_ERROR != IoResult){
_tprintf(_T("I/O操作出错,错误码:%u\n"),IoResult);
return;
} PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
DWORD dwCurTimestamp = GetTickCount(); switch (pIo->m_dwOp)
{
case OP_WRITE://写操作结束
{//写入操作结束
_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp); QMLX_SAFEFREE(pIo->m_pData);
QMLX_SAFEFREE(pIo);
}
break; case OP_READ: //读操作结束
break; default:
break;
}
} //写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
TCHAR pTxtContext[MAX_LOGLEN] = {};
PPER_IO_DATA pIo = NULL;
size_t szLen = ;
LPTSTR pWriteText = NULL; StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen); szLen += ; int i = ;
for (; i < MAXWRITEPERTHREAD;i++){
pWriteText = (LPTSTR)QMLX_CALLOC(szLen*sizeof(TCHAR));
QMLX_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext); //为每个操作申请一个“单IO数据”结构体
pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL); pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParam;
pIo->m_pData = pWriteText;
pIo->m_nLen = (szLen-)*sizeof(TCHAR); //这里使用原子操作同步文件指针,写入不会相互覆盖
//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳 StartThreadpoolIo(g_pThreadpoolIo); //写入
WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
if (ERROR_IO_PENDING != GetLastError()){
CancelThreadpoolIo(g_pThreadpoolIo);
}
} return i;
}
11.2.5 回调函数的终止操作
(1)回调函数的pInstance参数:当线程调用回调函数时,Windows会自动传一个pInstance参数(类型PTP_CALLBACK_INSTANCE)给回调函数,然后回调函数将这个参数又传给如下的函数,以便在这些函数在回调函数完后,执行一些相应的终止操作(主要是用来通知另一个线程,线程池中的工作项己经完成。显然,如下函数是在回调函数的内部进行调用的!)
函数 |
终止操作 |
LeaveCriticalSectionWhenCallbackReturns |
当回调函数返回时,线程池会自动调用LeavCriticalSection,并在参数中传入指定的CRITCAL_SECTION结构体。 |
ReleaseMutexWhenCallbackReturns |
当回调函数返回时,线程池会自动调用ReleaseMutex,并在参数中传入指定的HANDLE。 |
ReleaseSemaphoreWhenCallbackReturns |
当回调函数返回的时候,线程池会自动调用ReleaseSemaphore,并在参数中传入指定的HANDLE |
SetEventWhenCallbackReturns |
当回调函数返回的时候,线程池会自动调用SetEvent,并在参数中传入指定的HANDLE。 |
FreeLibraryWhenCallbackReturns |
当回调函数返回的时候,线程池会自动调用FreeLibrary,并在参数中传入指定的HMOUDLE。 (注意:如果回调函数是从DLL中载入的,这个函数尤为重要,因为当线程执行完毕后,回调函数不能自己调用FreeLibrary,否则回调函数代码将从进程中清除,这样当FreeLibrary试图返回到回调函数时,会引发访问违规) |
注意,对于任何一个回调函数,只能执行上述的一种终止操作。如果调用了以上的多个函数,则最后调用的终止函数会覆盖之前调用的那个。 |
(2)BOOL WINAPI CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci);用来通知线程池回调函数可能运行的时间会比较长。返回TRUE时,说明线程池还有其他线程可供使用。FALSE则相反。线程池会根据来决定是否创建新线程,以防止其他工作项出现挨饿现象。(只能在调用线程的回调函数里使用!)
(3)DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci)
用来告诉线程池,逻辑上自己已经完成了工作。这使得任何由于调用WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的线程能早一些返回,而不必等到线程从回调函数中结束时才返回。