ReadDirectoryChangesW---异步方式(IO完成端口)监控目录中的文件

时间:2025-04-17 14:23:51

同步方式处理的话如果多个文件有可能有处理不到的文件。采用异步方式可以解决这个问题。

基于IO完成端口实现的封装类class P2PFileShare:

 

#if !defined(AFX_P2PFILESHARE_H__5CFE59EF_7A09_4715_885F_FED4E2992470__INCLUDED_)
#define AFX_P2PFILESHARE_H__5CFE59EF_7A09_4715_885F_FED4E2992470__INCLUDED_

#include ""

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#define _WIN32_IE            0x0501 
  
#define _WIN32_DCOM 
    
class P2PFileShare 
{ 
	typedef struct 
	{ 
		OVERLAPPED  ov; 
		BYTE        buff[65536]; 
		LPTSTR      path; 
	    DWORD       flag; 
		HANDLE      handle; 
	}PATH_OV, *LPPATH_OV; 

public : 
    P2PFileShare() 
    :mh_IOCP(NULL) 
    ,mn_OVPtr(0) 
    ,mp_OVPtr(NULL) 
    { 
		::InitializeCriticalSection(&m_cs);
    } 
  
    virtual~P2PFileShare() 
    { 
		Close(TRUE); 

		::DeleteCriticalSection(&m_cs);
    } 
private:  
    HRESULT _CreateWorkerThread(); // 创建工作线程
   
#ifndef _WIN32_WCE 
    static UINT WINAPI _WorkerThreadProc(IN LPVOID pData); // 工作线程
#else 
    static DWORD WINAPI _WorkerThreadProc(IN LPVOID pData); // 工作线程
#endif  

    HRESULT _WorkerThreadProc(); 
    char* w2c(char *pcstr,const wchar_t *pwstr, size_t len);
public : 
    HRESULT Start(); 
    HRESULT MonitorPath(IN LPCTSTR sFileName); //监视指定目录
	void    Close(IN CONST BOOL bWait = FALSE); 

private : 
    HANDLE     mh_IOCP; 
    LONG       mn_OVPtr; 
    LPPATH_OV* mp_OVPtr; 
	CRITICAL_SECTION m_cs;
  
public : 
	inline void EnterLock(){::EnterCriticalSection(&m_cs);} 
	inline void LeaveLock(){::LeaveCriticalSection(&m_cs);}         
}; 

#endif // !defined(AFX_P2PFILESHARE_H__5CFE59EF_7A09_4715_885F_FED4E2992470__INCLUDED_)

 

// : implementation of the P2PFileShare class.
//
//
#include ""
#include ""
//
// Construction/Destruction
//
// 创建工作线程 ( 根据 CPU 的数量,创建相应数量的工作线程 ) 
HRESULT P2PFileShare::_CreateWorkerThread() 
{ 
	HRESULT hr = E_FAIL; 

	HANDLE hThread; 
#ifndef _WIN32_WCE 
    if((hThread = (HANDLE)_beginthreadex(NULL,0,_WorkerThreadProc,(LPVOID)this,0,NULL)) == 0) 
    { 
        return _doserrno; 
    } 
#else 
    if((hThread = (HANDLE)::CreateThread(NULL,0,_WorkerThreadProc,(LPVOID)this,0,&NULL)) == 0) 
    { 
        return ::GetLastError(); 
    } 
#endif 
    ::CloseHandle(hThread);      // 关闭句柄避免资源泄漏 
    hr = S_OK; 
  
    return hr; 
} 
  
// 工作线程 
#ifndef _WIN32_WCE 
UINT P2PFileShare::_WorkerThreadProc(IN LPVOID pData) 
#else 
DWORD P2PFileShare::_WorkerThreadProc (IN LPVOID pData) 
#endif      // #ifndef _WIN32_WCE 
{ 
    (( P2PFileShare *)pData)->_WorkerThreadProc(); 
  
#ifndef _WIN32_WCE 
    _endthreadex(0); 
#else 
    ExitThread(0); 
#endif 
    return 0; 
} 
  

HRESULT P2PFileShare::_WorkerThreadProc() 
{   
	char szlog[MAX_PATH] = {0};
    BOOL     bSucceed; 
    DWORD    dwBytes,dwNextEntryOffset(0);                      
    LPDWORD  pCT; 
    PATH_OV* pOV; 
	char szFileName[MAX_PATH] = {0};
  
    while (true)
    { 
        bSucceed = ::GetQueuedCompletionStatus(mh_IOCP,&dwBytes,(LPDWORD)&pCT,(LPOVERLAPPED*)&pOV,INFINITE); 
        if (bSucceed) 
        { 
            if (NULL == pOV)
			{
				break;          
			}

            FILE_NOTIFY_INFORMATION * pfiNotifyInfo = (FILE_NOTIFY_INFORMATION*)pOV->buff; 
  
            do 
            { 
                dwNextEntryOffset = pfiNotifyInfo->NextEntryOffset ; 
   
                switch (pfiNotifyInfo->Action) 
                { 
					case FILE_ACTION_REMOVED : //文件删除        
						{ 
							w2c(szFileName,pfiNotifyInfo->FileName, sizeof(szFileName));
							printf("删除了一个文件:%s\n",szFileName);
						} 
						break ; 
					case FILE_ACTION_ADDED : //文件添加          
						{ 
							w2c(szFileName,pfiNotifyInfo->FileName, sizeof(szFileName));
							printf("添加了一个文件:%s\n",szFileName);
						} 
						break;
					case FILE_ACTION_MODIFIED :  //文件修改       
						{ 
							w2c(szFileName,pfiNotifyInfo->FileName, sizeof(szFileName));
							printf("修改了一个文件:%s\n",szFileName);
						} 
						break ; 
					case FILE_ACTION_RENAMED_OLD_NAME : //文件改名                     
						{
							w2c(szFileName,pfiNotifyInfo->FileName, sizeof(szFileName));
							printf("改名了一个文件:%s(OLD_NAME)\n",szFileName);
						}
						break;
					case FILE_ACTION_RENAMED_NEW_NAME: //文件改名:新名字
						{    
							w2c(szFileName,pfiNotifyInfo->FileName, sizeof(szFileName));
							printf("改名了一个文件:%s(NEW_NAME)\n",szFileName); 
						} 
						break ; 
					default:
						break;
                } 
                            
                if (dwNextEntryOffset != 0) 
                { 
                    pfiNotifyInfo = (FILE_NOTIFY_INFORMATION*)((BYTE*)pfiNotifyInfo + dwNextEntryOffset); 
                } 

            }while(dwNextEntryOffset != 0); 
                     
            ::ZeroMemory (pOV->buff,65536);            
            ::ReadDirectoryChangesW (pOV ->handle 
                                    ,pOV ->buff 
                                    ,65536 
                                    ,FALSE 
                                    ,pOV ->flag 
                                    ,NULL 
                                    ,(OVERLAPPED*)pOV 
                                    ,NULL); 
        } 
    } 
  
    EnterLock(); 
  
    while(mn_OVPtr > 0) 
    { 
		::InterlockedDecrement(&mn_OVPtr); 
		pOV = mp_OVPtr[mn_OVPtr]; 

		::CloseHandle(pOV->handle); 
		delete[]pOV->path; 
		delete  pOV; 
    } 
    delete [] mp_OVPtr; 
    mp_OVPtr = NULL ; 
  
    ::CloseHandle(mh_IOCP); 
    mh_IOCP = NULL; 
  
    LeaveLock (); 
  
    return S_OK; 
} 
  
HRESULT P2PFileShare::Start() 
{ 
    HRESULT hr = S_OK; 
  
    EnterLock(); 
  
	if (mh_IOCP)
	{
		return E_FAIL;
	}
  
    // 创建完成端口 
    mh_IOCP = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,NULL,0); 
    if ( NULL == mh_IOCP ) 
    {
        hr = ::GetLastError(); 
    } 
    else 
    { 
        hr = _CreateWorkerThread(); 
        if (S_OK != hr) 
        { 
            ::CloseHandle(mh_IOCP); 
            mh_IOCP = NULL ; 
        } 
    } 
  
    LeaveLock(); 
  
    return hr ; 
} 
  

void P2PFileShare::Close(IN CONST BOOL bWait) 
{ 
	/*
	    The PostQueuedCompletionStatus function posts an I/O completion packet to an I/O completion port.
		此函数的参数2、参数3、参数4、分别被递送到GetQueuedCompletionStatus函数的参数2、3、4.
    */

    ::PostQueuedCompletionStatus(mh_IOCP,0,NULL,NULL); // 通知工作线程关闭 
  
    EnterLock(); 
  
    //先取消所有的IO操作,否则有可能会有内存问题 
    PATH_OV* pOV; 
    for (LONG i = 0;i < mn_OVPtr;i ++) 
    { 
        pOV = mp_OVPtr[i]; 
        ::CancelIo(pOV->handle); 
    } 
  
    LeaveLock(); 
  
    if (bWait) 
    { 
        while(mn_OVPtr > 0) ::Sleep(10); 
    } 
} 
  
  
// 监视共享目录 
HRESULT P2PFileShare::MonitorPath(IN LPCTSTR sPath) 
{ 
    if ( NULL == mh_IOCP )
	{
		return E_FAIL ; 
	}

    PATH_OV* pOV = new PATH_OV; 
      
	if ( NULL == pOV )
	{
		return E_OUTOFMEMORY; 
	}

    ::ZeroMemory(pOV,sizeof(PATH_OV)); 
       
	pOV->path = new char[MAX_PATH];
	strcpy(pOV->path,sPath);

    if(NULL == pOV->path) 
    { 
        delete pOV; 
    } 
  
    // 创建目录句柄 
    pOV->handle = ::CreateFile(
		pOV->path           //获取目录的句柄
       ,FILE_LIST_DIRECTORY   //ReadDirectoryChangesW需要此权限
       ,FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE 
       ,NULL 
       ,OPEN_EXISTING 
       ,FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED //标志1用来打开一个目录的句柄,标志2是
       ,NULL);                                          //CreateIoCompletionPort函数要求(异步)

    if (INVALID_HANDLE_VALUE == pOV->handle) 
    { 
		DWORD dwErr = GetLastError();
            
        delete [] pOV->path ; 
        delete pOV ; 

		return dwErr; 
    } 
       
    //将目录句柄和IO完成端口进行绑定 
    if ( NULL == ::CreateIoCompletionPort(pOV->handle,mh_IOCP,NULL,0)) 
    { 
		::CloseHandle(pOV->handle); 
		delete [] pOV->path; 
		delete pOV ; 
		return ::GetLastError(); 
    } 

	/*
	  返回值:如果函数成功,返回值就是非0。对于同步调用,这意味着操作成功,对于异步调用,
	  这意味着操作成功地排队。如果函数失败,返回值是0。如果操作目录或文件系统不支持这个操作,
	  函数将返回ERROR_INVALID_FUNCTION,可以使用GetLastError()函数获取。
	*/

	 pOV -> flag = FILE_NOTIFY_CHANGE_FILE_NAME; 
  
	BOOL bSucceed = :: ReadDirectoryChangesW (
							pOV -> handle //监控目录的句柄
						   ,pOV -> buff   //存储修改信息的首地址
						   ,65536         //分配的存储修改信息的空间的长度
						   ,FALSE         //TRUE则监控子目录,FALSE则不监控
						   ,pOV -> flag   //检查的筛选条件,以确定等待操作是否已完成。
						   ,NULL          //该函数返回信息的字节数,也就是存到lpBuffer中的内容的字节数
						   ,(OVERLAPPED*) pOV //一个指向OVERLAPPED结构的指针,他在同步调用时提供数据供使用,否则他就为NULL
						   ,NULL); //当操作结束或被取消或者线程进入等待状态时的一个指向将被调用操作的指针

/*
    其中buff的结构如下:
	typedef struct _FILE_NOTIFY_INFORMATION {  
					DWORD NextEntryOffset;  
					DWORD Action;  
					DWORD FileNameLength;  
					WCHAR FileName[1];
	} FILE_NOTIFY_INFORMATION, 
	*PFILE_NOTIFY_INFORMATION;

	参数1://Number of bytes that must be skipped to get to the next record.
	      //A value of zero indicates that this is the last record.

    参数2://Type of change that occurred.

	参数3://Length of the file name portion of the record, in bytes.
	       //Note that this length does not include the terminating null character.

    参数4://Variable-length field that contains the file name relative to the directory handle. /
		   //The file name is in the Unicode character format and is not null-terminated.
*/

    if (!bSucceed) 
    { 
		::CloseHandle(pOV->handle); 
		delete []pOV->path; 
		delete pOV; 
		return ::GetLastError(); 
    } 
  
    HRESULT hr = S_OK; 
    LONG i; 
    EnterLock(); 
    for(i =0;i < mn_OVPtr;i++) 
    { 
        if(strcmp(mp_OVPtr[i]->path,pOV->path) == 0) 
        { 
            hr = S_FALSE; 
            break; 
        } 
    } 
       
    if (i >= mn_OVPtr) 
    { 
        LPPATH_OV * pOVPtr = new LPPATH_OV[mn_OVPtr + 1]; 
        if (NULL == pOVPtr) 
        { 
            hr = E_OUTOFMEMORY; 
        } 
        else 
        { 
            if(mp_OVPtr != NULL) 
            { 
                ::CopyMemory(pOVPtr,mp_OVPtr,sizeof(LPPATH_OV)*mn_OVPtr); 
                delete [] mp_OVPtr; 
            } 
                     
            pOVPtr[mn_OVPtr] = pOV; 
            mp_OVPtr = pOVPtr; 
            ::InterlockedIncrement(&mn_OVPtr); 
        } 
    } 
    LeaveLock(); 
  
    if (S_OK != hr) 
    { 
        ::CloseHandle(pOV->handle); 
        delete [] pOV->path; 
        delete pOV; 
        return hr; 
    } 
  
    return S_OK; 
} 
  
char* P2PFileShare::w2c(char *pcstr,const wchar_t *pwstr, size_t len)
{
	int nlength = wcslen(pwstr);

	//获取转换后的长度

	int nbytes = WideCharToMultiByte( 
		0,       // specify the code page used to perform the conversion
		0,       // no special flags to handle unmapped characters
		pwstr,   // wide character string to convert
		nlength, // the number of wide characters in that string
		NULL,    // no output buffer given, we just want to know how long it needs to be
		0,
		NULL,    // no replacement character given
		NULL );  // we don't want to know if a character didn't make it through the translation

	// make sure the buffer is big enough for this, making it larger if necessary
	if(nbytes > len)   nbytes = len;

	// 通过以上得到的结果,转换unicode 字符为ascii 字符

	WideCharToMultiByte( 
		0,        // specify the code page used to perform the conversion
		0,        // no special flags to handle unmapped characters
		pwstr,   // wide character string to convert
		nlength, // the number of wide characters in that string
		pcstr,   // put the output ascii characters at the end of the buffer
		nbytes,  // there is at least this much space there
		NULL,    // no replacement character given
		NULL );

	return pcstr ;
}
#include ""
#include ""

int _tmain(int argc, _TCHAR* argv[])
{
	HRESULT hr = S_OK;
	P2PFileShare oShare;
	int nCount = 0;

	hr = ();//启动工作线程

	if (hr == S_OK)
	{
		hr = ("D:\\Data");

		if (hr == S_OK)
		{
			while (true)
			{
				if (nCount > 600)
				{
					break;
				}

				nCount++;

				Sleep(1000);
			}
		}
		else
		{
			cout << "失败!" << endl;
		}

		();
	}
	else
	{
		cout << "()失败!" << endl;
	}

	return 0;
}