Python中的threading

时间:2023-03-09 06:16:49
Python中的threading

Python中的threading

RLock——重入锁

RLock在Python中的实现是对Lock的封装,具体在类中维护了一个重入次数的变量。一旦一个线程获得一个RLock,该线程再次要求获得该锁时不会阻塞,但该线程获得多少次该锁,则必须释放多少次。一个重入锁必须由获得该锁的线程释放。

源码实现:

数据结构:

  • __block:普通Lock
  • __owner:该锁的拥有者线程
  • __count:该锁被拥有者线程重入的次数

具体方法:

acquire(self, blocking=1): 获取锁
def acquire(self, blocking=1):
me = _get_ident() # 得到调用该函数线程的ID
if self.__owner == me: # 与self.__owner对比
self.__count = self.__count + 1 # 相等,则说明之前该锁就属于该线程,则重入次数自增
return 1 # 返回获取成功
rc = self.__block.acquire(blocking) # 不等的话,则尝试获取__block锁
if rc: # 如果获取成功
self.__owner = me # 则将该锁的拥有者赋值为该线程
self.__count = 1 # 并将重入次数自增
return rc # 返回获取成功与否
release(self):释放锁
def release(self):
if self.__owner != _get_ident(): # 如果当前该锁的拥有者线程不是调用该函数的线程,抛出异常
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1 # 否则,重入次数自减
if not count: # 当重入次数为0时
self.__owner = None # 该锁拥有者为None
self.__block.release() # 且释放__block

Condition——条件变量

Condition对象允许一个或多个线程等待直到它们被其它线程通知,然后继续执行。Condition除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。可以将Condition看成一个有条件的锁,其中增加了当不满足条件时,需等待的接口。

一个线程首先调用condition的acquire方法,然后判断一些条件,如不满足则wait;如果条件满足,进行一些处理,通过notify方法来通知其他线程。

源码实现:

数据结构:

  • __lock:Lock或者RLock实例,Condition中锁的实现
  • __waiters:等待的线程

具体方法:

wait(self, timeout=None):条件不满足时等待
def wait(self, timeout=None):
if not self._is_owned(): # 调用该函数前,必须先获取锁
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock() # 申请一个普通的Lock
waiter.acquire() # 首先先获取了,再获取需阻塞
self.__waiters.append(waiter) # 即将该线程挂到锁上面,再把锁弄进等待队列
saved_state = self._release_save() # 释放锁,保存锁的一些状态,当为重入锁时,则需保存锁的拥有者和重入次数状态
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None: # 当timeout为空,则代表永久阻塞
waiter.acquire() # 再获取,即该线程会阻塞到该锁上
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit: 如果在规定时间内有其他线程通知了该线程,则退出
break
remaining = endtime - _time()
if remaining <= 0: # 如果超时,则退出
break
delay = min(delay * 2, remaining, .05) # 获取需要的睡眠时间,每次延长
_sleep(delay)
if not gotit:
try:
self.__waiters.remove(waiter)
except ValueError:
pass
finally:
self._acquire_restore(saved_state) # 最后竞争该锁,恢复状态
notify(self, n=1):唤醒n个线程
def notify(self, n=1):
if not self._is_owned(): # 必须拥有锁的状态下,才能调用notify
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
waiters = __waiters[:n] # 唤醒前n个
if not waiters:
return
for waiter in waiters:
waiter.release() # 唤醒
try:
__waiters.remove(waiter) # 从等待的队列中移除
except ValueError:
pass
notifyAll(self):唤醒所有在该condition上等待的线程
def notifyAll(self):
self.notify(len(self.__waiters))

Semaphore——信号量

Semaphore维护了一个计数器,代表release调用的次数 - acquire调用次数 + 计数器初始值。最简单的应用为操作系统中,某个资源(例如打印机)的个数为n,线程的数量为m,当线程抢占资源时,可以使用Semaphore来保证资源利用的合理性。

源码实现:

数据结构:

  • __cond: 为Condition(Lock()),当资源数不够时,调用__condwait()方法来等待其他线程释放资源的通知(notify())
  • __value: 为int型,代表初始资源的个数

具体方法:

acqiure(self, blocking=1):抢占其中一个资源
def acquire(self, blocking=1):
rc = False
with self.__cond: # 首先抢占条件变量的锁,以下代码为多个线程互斥访问
while self.__value == 0:
if not blocking:
break
self.__cond.wait() # 当资源为0时, 则等待别的线程释放,唤醒则继续判断资源个数
else:
self.__value = self.__value - 1 # 否则资源足够,则减一,表示获取一个资源
rc = True
return rc
release(self):释放一个资源
def release(self):
with self.__cond: # 同样,先抢占条件变量锁,以下代码互斥访问
self.__value = self.__value + 1 # 资源数加一,表示该线程用好资源了
self.__cond.notify() # 唤醒其中一个线程叫它来抢资源

Event——事件监听

Event机制类似于事件的监听机制,其他线程调用wait()方法阻塞监听一个事件的发生,当某个线程检测到事件的发生,会向所有等待的线程发送一个信号,等待线程重新唤醒继续执行。

源码实现

数据结构

  • __cond: 为Condition(Lock()),当事件未发生时,调用__condwait()方法阻塞等待事件发生
  • __flag:初始值为false,代表事件未发生,当发生时,置为true

具体方法

wait(self, timeout=None):等待直到__flag为true
def wait(self, timeout=None):
self.__cond.acquire() # 抢占条件变量的锁
try:
if not self.__flag: # 查看事件发生与否
self.__cond.wait(timeout) # 如未发生,则阻塞
return self.__flag
finally:
self.__cond.release()
set(self):设置__flag为true,表明事件的发生
def set(self):
self.__cond.acquire() # 抢占条件变量的锁
try:
self.__flag = True # 设置事件发生
self.__cond.notify_all() # 通知所有等待的线程
finally:
self.__cond.release()