同步机制——互斥体锁、读/写锁、信号量锁、条件变量

时间:2021-11-23 15:14:38

(区别于linux内核所用的自旋锁和互斥锁,本文中讨论的锁用于普通编程)

当两个或多个并发线程的执行次序造成了意想不到的错误结果时,“竞态条件”就是会产生。防止“竞态条件”的一个方法是使用同步机制,对访问“共享资源”的代码中关键段实施“品行访问”控制。常用的OS同步机制有:互斥体(mutex)、“多读取者/单写入者”锁(reader/writer locks)、信号量(semaphores)和条件变量(condition variable)。


1.互斥体(mutex,Mutual Exclusion)锁
当共享资源被多个线程并发访问时,为了确保这些资源的完整性,我们可以使用互斥体(mutex)锁。互斥体可用来串行执行多个线程,这需要在代码中确定关键 (critical section)——即,一次只能由一个线程执行的代码。“互斥体”语义像双括号那样具有“对称体”:也就是就说,如果一个线程拥有互斥体,那么,它还和负责释放这个互斥体。这种简单的语义有助于互斥体的高效实现——如通过硬件自旋锁(spinlock)来实现。

常见的互斥体有两种:
非递归互斥体(nonrecursive mutex):如果当前拥有互斥体的线程在没有首先释放它的情况,试图再次获得它,就会导致死锁或失败。
递归互斥体(recursive mutex):拥有互斥体的线程可能多次获得它而不会产生自死锁,只要这个线程最终以相同次数释放这个互斥体即可。

2.Readers/Writer锁
Readers/Writer(多读取者/单写入者)锁可以通过以下方式之一访问共享资源:
多个线程并发读取资源,但不修改它;
一次只一个线程修改资源,此时其它线程都不能对其进行读/写访问。

Readers/Writer锁可以用来保护“读操作比写操作频繁”的资源,从而提高并发应用程序的性能。在实现Readers/Writer锁时,要么给“读取者”以优先权,要么给“写入者”以优先权。

Readers/Writer 锁和互斥体有某些共性,例如:获得锁的线程也必须释放这个锁,如查一个“写入者”希望获得这个锁,那么,这个线程必须等待其它所有“拥有这个锁”的线程释放它;然后,这个“写入者”线程单独占有这个锁。便但和互斥体不同的是,多个线程可以同时获得一个Readers/Writer锁执行“读”操作。

3.信号量锁
从概念上说,信号量(semaphore)是可以原子(automically)递增和背叛的非负整数。如果一个线程试图递减一个信号量,但这个信号量的值已经为0,则线程将会阻塞。另一个线程“发出(post)”这个信号(semaphore),使用信号量大于0之后,被阻塞的线程才会被释放。

信号量维护状态信息,对信号计数值(count)和被阻塞线程的数量进行记录。它们一般是通过“休止锁(sleep lock)”来实现的;休止锁用来触发环境切换,以允许其他线程执行。和互斥体不同的是,释放信号量的线程不必是最初获得这个信号量的线程。这使得信号量适用于更广泛的执行环境,如信号处理程序或中断处理程序。

4.条件变量
和互斥体、Readers/Writer锁、信号量锁不同,条件变量(condtion variable)提供了不同风格的同步方式。在前三种机制中,当“占有锁的线程”在关键段中执行代码时,其他线程会等待。下此相反,使用条件变量,线程可以调整和调度自己的处理过程。

例如,某一数据被其他线程共享;条件变量可能使用处于等待状态,直至“涉及这个数据”的一个条件表达式达到某一状态。当一个“合作线程(cooperation thread)”显示共享数据的状态已经改变时,阻塞在条件上的线程会被唤醒;然后,被唤醒的线程重新计算它的条件表达式,如果共享数据达到预期状态,则恢复处理。再复杂的条件表达式也可以通过条件变量来等待;所以,较之前面所说的同步机制,条件变量允许更复杂的调度决策。
 

Windows 平台下的同步机制 (1)– 临界区(CriticalSection)

临界区的使用在线程同步中应该算是比较简单,说它简单还是说它同后面讲到的其它方法相比更容易理解。举个简单的例子:比如说有一个全局变量(公共资源)两个线程都会对它进行写操作和读操作,如果我们在这里不加以控制,会产生意想不到的结果。假设线程A正在把全局变量加1然后打印在屏幕上,但是这时切换到线程B,线程B又把全局变量加1然后又切换到线程A,这时候线程A打印的结果就不是程序想要的结果,也就产生了错误。解决的办法就是设置一个区域,让线程A在操纵全局变量的时候进行加锁,线程B如果想操纵这个全局变量就要等待线程A释放这个锁,这个也就是临界区的概念。

使用方法:
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
EnterCriticalSection(&cs);

LeaveCriticalSection(&cs);
DeleteCriticalSection(&cs);

#include "stdafx.h"
#include <windows.h>
#include <process.h>
#include <iostream>
using namespace std;

/****************************************************************
*在使用临界区的时候要注意,每一个共享资源就有一个CRITICAL_SECTION
*如果要一次访问多个共享变量,各个线程要保证访问的顺序一致,如果不
*一致,很可能发生死锁。例如:
*   thread one:
*   EnterCriticalSection(&c1)
*   EnterCriticalSection(&c2)
*   …
*   Leave…
*   Leave…
*
*   thread two:
*   EnterCriticalSection(&c2);
*   EnterCriticalSection(&c1);
*   …
*   Leave…
*   Leave…
*这样的情况就会发生死锁,应该让线程2进入临界区的顺序同线程1相同
****************************************************************/

封装类:

class Lock;

class CLock 
{
public:
    CLock() { ::InitializeCriticalSection(&_cs); }
    ~CLock() { ::DeleteCriticalSection(&_cs); }
private:
    friend class Lock;
    CRITICAL_SECTION _cs;

    CLock(const CLock&);
    CLock& operator=(const CLock&);
};

class Lock
{
public:
    Lock(CLock& lock) : _lock(lock) { ::EnterCriticalSection(&_lock._cs); }
    ~Lock() { ::LeaveCriticalSection(&_lock._cs); }
private:
    CLock& _lock;
    Lock(const Lock&);
    Lock& operator=(const Lock&);
};


Windows 平台下的同步机制 (2)– 互斥体(Mutex)

windows api中提供了一个互斥体,功能上要比临界区强大。Mutex是互斥体的意思,当一个线程持有一个Mutex时,其它线程申请持有同一个Mutex会被阻塞,因此可以通过Mutex来保证对某一资源的互斥访问(即同一时间最多只有一个线程访问)。
调用CreateMutex可以创建或打开一个Mutex对象,其原型如下

HANDLE CreateMutex(
  LPSECURITY_ATTRIBUTES lpMutexAttributes,
  BOOL bInitialOwner,
  LPCTSTR lpName
);

其中参数lpMutexAttributes用来设定Mutex对象的安全描述符和是否允许子进程继承句柄。bInitialOwner表明是否将Mutex的持有者设置为调用线程。lpName参数设置Mutex的名字,该名字区分大小写并不能包含"",最大长度为MAX_PATH,可设置为NULL表明该Mutex为匿名对象。
如果调用成功,则返回Mutex的句柄,否则返回NULL,如果lpName不为NULL且调用前同名的Mutex已被创建,则返回同名Mutex的句柄,此时调用GetLastError将返回ERROR_ALREADY_EXISTS,参数bInitialOwner将被忽略。

还可以调用OpenMutex打开创建的非匿名Mutex,原型如下

HANDLE OpenMutex(
  DWORD dwDesiredAccess,
  BOOL bInheritHandle,
  LPCTSTR lpName
);

在成功创建或打开Mutex后,可以使用wait functions来等待并获取Mutex的持有权。

下面的例子用来通过Mutex对象控制某一应用程序只运行一次

    int WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdShow)
    {
        HANDLE hMutex = CreateMutex(NULL, FALSE, "Mutex_Only_One_Instance_Allowed");
        if (NULL == hMutex)
        {
            Error("Create mutex error.");
            return -1;
        }
        DWORD dw = WaitForSingleObject(hMutex, 0);
        if (WAIT_FAILED == dw)
        {
            Error("Wait for mutex error.");
            CloseHandle(hMutex); // 释放句柄,当指向同一系统对象的所有句柄释放后,该对象将被删除。
            return -1;
        }
        else if (WAIT_TIMEOUT == dw)
        {
            // 另外一个实例正在运行
            CloseHandle(hMutex);
            return 1;
        }

        // 没有其它实例在运行,本实例将继续运行
        // 在此实现必要的功能性代码,如创建窗口,进入消息循环
        // ……………

        ReleaseMutex(hMutex); // 释放hMutex的持有权,注意这并不等同于删除Mutex对象
        CloseHandle(hMutex);

        return 0;
    }

其中WaitForSingleObject是等待特定对象发出信号(signaled),而Mutex对象在没有任何线程持有时会发出信号。

与临界区(critical section)有什么区别,为什么强大?它们有以下几点不一致:
1.critical section是局部对象,而mutex是核心对象。因此像waitforsingleobject是不可以等待临界区的。
2.critical section是快速高效的,而mutex同其相比要慢很多
3.critical section使用范围是单一进程中的各个线程,而mutex由于可以有一个名字,因此它是可以应用于不同的进程,当然也可以应用于同一个进程中的不同线程。
4.critical section 无法检测到是否被某一个线程释放,而mutex在某一个线程结束之后会产生一个abandoned的信息。同时mutex只能被拥有它的线程释放。下面举两个应用mutex的例子,一个是程序只能运行一个实例,也就是说同一个程序如果已经运行了,就不能再运行了;另一个是关于非常经典的哲学家吃饭问题的例子。

互斥体通常用于多进程之间的同步问题

程序运行单个实例:
#include "stdafx.h"
#include <windows.h>
#include <process.h>
#include <iostream>
using namespace std;

//当输入s或者c时候结束程序
void PrintInfo(HANDLE& h, char t)
{
    char c;
    while (1)
    {
        cin >> c;
        if (c == t)
        {
            ReleaseMutex(h);
            CloseHandle(h);
            break;
        }
        Sleep(100);
    }
}
int main(int argc, char* argv[])
{
    //创建mutex,当已经程序发现已经有这个mutex时候,就相当于openmutex
    HANDLE hHandle = CreateMutex(NULL, FALSE, "mutex_test");
    if (GetLastError() == ERROR_ALREADY_EXISTS)
    {
        cout << "you had run this program!" << endl;
        cout << "input c to close this window" << endl;
        PrintInfo(hHandle, ‘c’);
        return 1;
    }
    cout << "program run!" << endl;
    cout << "input s to exit program" <<endl;
    PrintInfo(hHandle, ‘s’);
    return 1;
}

封装:

struct _Lock
{
    _Lock(HANDLE& mtx) : _mtx(mtx), _ret(WAIT_OBJECT_0)
    {
        if (_mtx != 0)
            _ret = WaitForSingleObject(_mtx, 100000);
    }
    ~_Lock()
    {
        Release();
    }

    void Release()
    {
        if (_mtx != 0 && _ret == WAIT_OBJECT_0)
            ReleaseMutex(_mtx);
    }
    HANDLE& _mtx;
    DWORD _ret;
};

 

哲学家吃饭问题

const int PHILOSOPHERS = 5;          //哲学家人数
const int TIME_EATING = 50;         //吃饭需要的时间 毫秒
HANDLE event[PHILOSOPHERS];    //主线程同工作线程保持同步的句柄数组
HANDLE mutex[PHILOSOPHERS];   //mutex数组,这里相当于公共资源筷子
CRITICAL_SECTION cs;                //控制打印的临界区变量

UINT WINAPI ThreadFunc(void* arg)
{
    int num = (int)arg;
    DWORD ret = 0;
    while (1)
    {
        ret = WaitForMultipleObjects(2, mutex, TRUE, 1000);
        if (ret == WAIT_TIMEOUT)
        {
            Sleep(100);
            continue;
        }
        EnterCriticalSection(&cs);
            cout << "philosopher " << num << " eatting" << endl;
        LeaveCriticalSection(&cs);
        Sleep(TIME_EATING);
        break;
    }
    //设置时间为有信号
    SetEvent(event[num]);
    return 1;
}
int main(int argc, char* argv[])
{
    HANDLE hThread;
    InitializeCriticalSection(&cs);
    //循环建立线程
    for (int i = 0; i < PHILOSOPHERS; i++)
    {
        mutex[i] = CreateMutex(NULL, FALSE, "");
        event[i] = CreateEvent(NULL, TRUE, FALSE, "");
        hThread = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, (void*)i, 0, NULL);
        if (hThread == 0)
        {
            cout << "create thread " << i << "failed with code: "
                << GetLastError() << endl;
            DeleteCriticalSection(&cs);
            return -1;
        }
        CloseHandle(hThread);
    }
    //等待所有的哲学家吃饭结束
    DWORD ret = WaitForMultipleObjects(PHILOSOPHERS, event, TRUE, INFINITE);
    if (ret == WAIT_OBJECT_0)
    {
        cout << "all the philosophers had a dinner!" << endl;
    }
    else
    {
        cout << "WaitForMultipleObjects failed with code: " << GetLastError() << endl;
    }
    DeleteCriticalSection(&cs);
    for (int j = 0; j < PHILOSOPHERS; j++)
    {
        CloseHandle(mutex[j]);
    }
    return 1;
}


Windows 平台下的同步机制 (3)– 事件(Event)

事件对象的特点是它可以应用在重叠I/O(overlapped I/0)上,比如说socket编程中有两种模型,一种是重叠I/0,一种是完成端口都是可以使用事件同步。它也是核心对象,因此可以被waitforsingleobje这些函数等待;事件可以有名字,因此可以被其他进程开启。

Event即事件是一种用于进行线程/进程间同步的对象,事件有置位和复位两种状态,当线程通过waiting functions等待Event对象置位时该线程将进入阻塞状态,当该Event对象被置位或等待超时后,等待的线程将恢复执行。Event可以用在一个线程要等待其它线程时。
可以使用CreateEvent创建Event对象
HANDLE WINAPI CreateEvent(
    LPSECURITY_ATTRIBUTES lpEventAttributes,
    BOOL bManualReset,
    BOOL bInitialState,
    LPCTSTR lpName
);
lpEventAttributes用于指定Event对象的安全属性,包括句柄是否可被子进程继承和对象的安全描述符。可设置NULL取默认安全属性。
bManualReset表明Event对象是否需要手动复位。如果该参数为TRUE,则Event对象需要通过ResetEvent函数手动复位。如果该参数为FALSE,则Event被创建为自动复位的Event,任何等待的线程被恢复执行后,该Event将被系统自动复位。打个比方,如果有10个线程在等待一个Event,这时将Event置位,如果这是个手动复位Event,那么这10个线程将被依次唤醒直到通过ResetEvent调用将该Event复位;如果Event为自动复位Event,那么10个线程中的第一个被唤醒后Event被自动复位,其它线程将继续等待。
bInitialState参数表明Event对象被创建后默认是否置位。
lpName参数是Event的名字,可以为空表明将创建匿名Event。
CreateEvent函数在调用成功后返回Event句柄。如果同名Event已经存在,则返回这个已经存在了的Event的句柄,此时调用GetLastError函数将返回 ERROR_ALREADY_EXISTS。
还可以通过OpenEvent打开一个已经创建的非匿名Event
HANDLE WINAPI OpenEvent(
    DWORD dwDesiredAccess,
    BOOL bInheritHandle,
    LPCTSTR lpName
);
在创建或打开了Event对象之后,可以使用SetEvent和ResetEvent函数来置位或复位一个Event对象。
BOOL WINAPI SetEvent(
    HANDLE hEvent
);
BOOL WINAPI ResetEvent(
    HANDLE hEvent
);
要等待一个或多个Event对象置位可以使用wait functions。
简单示例,一个线程不停读取用户输入并放入message列表,另一个线程模拟将message发送出去,如果没有消息,则发送线程处于阻塞状态等待,一旦有消息录入,输入线程将event置位,发送线程即被激活并逐个发送消息。
#include "stdafx.h"
#include <windows.h>
#include <tchar.h>
#include <iostream>
#include <list>
#include <string>
using namespace std;

#ifdef _UNICODE
typedef wstring tstring;
#define tcout wcout
#define tcin wcin
#else
typedef string tstring;
#define tcout cout
#define tcin cin
#endif /* _UNICODE */

typedef list<tstring> StringList;

HANDLE hMutex = NULL;
HANDLE hEvent = NULL;
HANDLE hSendThread = NULL;
StringList messages;

bool isRunning;

DWORD WINAPI SendThreadProc(LPVOID lpThreadParameter)
{
DWORD dw;
while(isRunning)
{
  dw = WaitForSingleObject(hEvent, INFINITE);
  if(dw != WAIT_OBJECT_0)
  {
   tcout << _T("Wait error.") << endl;
   return -1;
  }
  dw = WaitForSingleObject(hMutex, INFINITE);
  if(WAIT_OBJECT_0 != dw && WAIT_ABANDONED != dw)
  {
   tcout << _T("Wait error.") << endl;
   return -2;
  }

  StringList list(messages);
  messages.clear();

  ReleaseMutex(hMutex);

  for(StringList::iterator i = list.begin(); i != list.end(); i++)
  {
   Sleep(1000); //休眠1秒模拟发送所耗时间

   tcout << _T("/* Send Message:") << *i << _T(" */");
  }

}

return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
hMutex = CreateMutex(NULL, FALSE, NULL);
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
isRunning = true;

hSendThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)SendThreadProc, NULL, 0, NULL);

while(isRunning)
{
  tstring s;
  tcin >> s;
  if(s == _T("quit"))
  {
   isRunning = true;
   break;
  }

  DWORD dw = WaitForSingleObject(hMutex, INFINITE);
  if(WAIT_OBJECT_0 != dw && WAIT_ABANDONED != dw)
  {
   tcout << _T("Wait error.") << endl;
   return -1;
  }
  messages.push_back(s);
  ReleaseMutex(hMutex);
  SetEvent(hEvent);
}

CloseHandle(hMutex);
CloseHandle(hEvent);
CloseHandle(hSendThread);

return 0;

}


Windows 平台下的同步机制 (4)– 信号量(Semaphore)

Semaphore是旗语的意思,在Windows中,Semaphore对象用来控制对资源的并发访问数。Semaphore对象具有一个计数值,当值大于0时,Semaphore被置信号,当计数值等于0时,Semaphore被清除信号。每次针对Semaphore的wait functions返回时,计数值被减1,调用ReleaseSemaphore可以将计数值增加 lReleaseCount 参数值指定的值。

CreateSemaphore函数用于创建一个Semaphore

HANDLE CreateSemaphore(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
LONG lInitialCount,
LONG lMaximumCount,
LPCTSTR lpName
);

lpSemaphoreAttributes为安全属性,
lInitialCount为Semaphore的初始值,
lMaximumCount为最大值,
lpName为Semaphore对象的名字,NULL表示创建匿名Semaphore

此外还可以调用OpenSemaphore来打开已经创建的非匿名Semaphore

HANDLE OpenSemaphore(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCTSTR lpName
);

调用ReleaseSemaphore增加Semaphore计算值

BOOL ReleaseSemaphore(
HANDLE hSemaphore,
LONG lReleaseCount,
LPLONG lpPreviousCount
);

lpReleaseCount参数表示要增加的数值,
lpPreviousCount参数用于返回之前的计算值,如果不需要可以设置为NULL

比如我们要控制到服务器的连接数不超过10个,可以创建一个Semaphore,初值为10,每当要连接到服务器时,使用WaitForSingleObject请求Semaphore,当成功返回后再尝试连接到服务器,当连接失败或连接使用完后释放时,调用ReleaseSemaphore增加Semaphore计数值。

看个例子,popo现在好像在本机只能运行三个实例,mutex可以让程序只是运行一个实例,下面我通过信号量机制让程序像popo一样运行三个实例。

#include "stdafx.h"
#include <windows.h>
#include <iostream>
using namespace std;

const int MAX_RUNNUM = 3;  //最多运行实例个数
void PrintInfo()
{
    char c;
    cout << "run program" << endl;
    cout << "input s to exit program!" << endl;
    while (1)
    {
        cin >> c;
        if (c == ‘s’)
        {
            break;
        }
        Sleep(10);
    }
}
int main(int argc, char* argv[])
{
    HANDLE hSe = CreateSemaphore(NULL, MAX_RUNNUM, MAX_RUNNUM, "semaphore_test");
    DWORD ret = 0;
    if (hSe == NULL)
    {
        cout << "createsemaphore failed with code: " << GetLastError() << endl;
        return -1;
    }
    ret = WaitForSingleObject(hSe, 1000);
    if (ret == WAIT_TIMEOUT)
    {
        cout << "you have runned " << MAX_RUNNUM << " program!" << endl;
        ret = WaitForSingleObject(hSe, INFINITE);
    }
    PrintInfo();
    ReleaseSemaphore(hSe, 1, NULL);
    CloseHandle(hSe);
    return 0;
}

From MSDN:

Using Semaphore Objects

The following example uses a semaphore object to limit the number of threads that can perform a particular task. First, it uses theCreateSemaphore function to create the semaphore and to specify initial and maximum counts, then it uses theCreateThread function to create the threads.

Before a thread attempts to perform the task, it uses the WaitForSingleObject function to determine whether the semaphore’s current count permits it to do so. The wait function’s time-out parameter is set to zero, so the function returns immediately if the semaphore is in the nonsignaled state.WaitForSingleObject decrements the semaphore’s count by one.

When a thread completes the task, it uses the ReleaseSemaphore function to increment the semaphore’s count, thus enabling another waiting thread to perform the task.

Copy

#include <windows.h>
#include <stdio.h>

#define MAX_SEM_COUNT 10
#define THREADCOUNT 12

HANDLE ghSemaphore;

DWORD WINAPI ThreadProc( LPVOID );

void main()
{
HANDLE aThread[THREADCOUNT];
DWORD ThreadID;
int i;

// Create a semaphore with initial and max counts of MAX_SEM_COUNT

ghSemaphore = CreateSemaphore(
NULL, // default security attributes
MAX_SEM_COUNT, // initial count
MAX_SEM_COUNT, // maximum count
NULL); // unnamed semaphore

if (ghSemaphore == NULL)
{
printf("CreateSemaphore error: %dn", GetLastError());
return;
}

// Create worker threads

for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(
NULL, // default security attributes
0, // default stack size
(LPTHREAD_START_ROUTINE) ThreadProc,
NULL, // no thread function arguments
0, // default creation flags
&ThreadID); // receive thread identifier

if( aThread[i] == NULL )
{
printf("CreateThread error: %dn", GetLastError());
return;
}
}

// Wait for all threads to terminate

WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

// Close thread and semaphore handles

for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);

CloseHandle(ghSemaphore);
}

DWORD WINAPI ThreadProc( LPVOID lpParam )
{
DWORD dwWaitResult;
BOOL bContinue=TRUE;

while(bContinue)
{
// Try to enter the semaphore gate.

dwWaitResult = WaitForSingleObject(
ghSemaphore, // handle to semaphore
0L); // zero-second time-out interval

switch (dwWaitResult)
{
// The semaphore object was signaled.
case WAIT_OBJECT_0:
// TODO: Perform task
printf("Thread %d: wait succeededn", GetCurrentThreadId());
bContinue=FALSE;

// Simulate thread spending time on task
Sleep(5);

// Release the semaphore when task is finished

if (!ReleaseSemaphore(
ghSemaphore, // handle to semaphore
1, // increase count by one
NULL) ) // not interested in previous count
{
printf("ReleaseSemaphore error: %dn", GetLastError());
}
break;

// The semaphore was nonsignaled, so a time-out occurred.
case WAIT_TIMEOUT:
printf("Thread %d: wait timed outn", GetCurrentThreadId());
break;
}
}
return TRUE;
}