Python的并发并行[1] -> 线程[2] -> 锁与信号量

时间:2023-03-09 15:38:19
Python的并发并行[1] -> 线程[2] -> 锁与信号量

锁与信号量


目录

  1. 添加线程锁
  2. 锁的本质
  3. 互斥锁与可重入锁
  4. 死锁的产生
  5. 锁的上下文管理
  6. 信号量与有界信号量

添加线程锁

由于多线程对资源的抢占顺序不同,可能会产生冲突,通过添加线程锁来对共有资源进行控制。

 import atexit
from random import randrange
from threading import Thread, Lock, current_thread # or currentThread
from time import ctime, sleep # Use set to record the running thread
# If we print set directly, it will shows set([a, b, c])
# So we reload the __str__ function to show it more clean(shows a, b, c as we want)
class CleanOutputSet(set):
def __str__(self):
return ', '.join(x for x in self) # randrange(2, 5) will generate a num in range(2, 5)
# for x in range(randrange(3, 7)) will do "for" loop 3-7 times
# Below code generate a list which contains 3-6 numbers in range 2-4
# If not list(), it will return a generator, and it will be null after one time iteration
loops = list((randrange(2, 5) for x in range(randrange(3, 7)))) remaining = CleanOutputSet()
lock = Lock() def loop_without_lock(nsec):
myname = current_thread().name
remaining.add(myname)
print('[%s] Start %s' % (ctime(), myname))
sleep(nsec)
remaining.remove(myname)
print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
# Note: remaining or 'NONE', will return 'NONE' if remaining is Null
# Null including: None, False, (), [], {}, 0
print(' (remaining: %s)' % (remaining or 'NONE')) def loop_with_lock(nsec):
myname = current_thread().name
# When we need to modifiy public resource, acquire lock to block other threads
# Lock acquire and release can use 'with lock' to simplify
lock.acquire()
remaining.add(myname)
print('[%s] Start %s' % (ctime(), myname))
# After using resource, release lock for other threads to use
lock.release()
sleep(nsec) lock.acquire()
remaining.remove(myname)
print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'NONE'))
lock.release() def _main():
print('-----Below threads without lock-----')
threads = []
for pause in loops:
threads.append(Thread(target=loop_without_lock, args=(pause, )))
for t in threads:
t.start()
for t in threads:
t.join()
print('-----Below threads with lock-----')
threads = []
for pause in loops:
threads.append(Thread(target=loop_with_lock, args=(pause, )))
for t in threads:
t.start()
for t in threads:
t.join() # This is an exit function, when script exit, this function will be called
# You can use atexit.register(_atexit) to replace @atexit.register
# The function name '_atexit' can be change to others
@atexit.register
def _atexit():
print('All DONE at:', ctime()) if __name__ == '__main__':
_main()

第 1-4 行,首先导入需要的模块,atexit用于设置退出脚本时的处理函数,random用于产生随机数来增加线程的不确定性。

第 7- 12 行,定义一个新的集合类,用于输出当前运行线程的集合,新的集合类CleanOutputSet重载了__str__方法,使集合的显示由set([a, b, c])变为我们想要的a, b, c形式。

第 14-21行,利用随机数模块,产生一个生成器,包含3-6个大小在2-4之间的随机数,为了后续重复使用,此处对生成器进行list操作,否则生成器在迭代一次之后将变为空,无法复用。同时对全局锁和集合输出类进行实例化。

第 23-32 行,定义一个不加锁的线程函数,该函数会在进入时向集合添加线程名,sleep相应时间后,移除线程名,同时显示集合(共有资源)内剩余的线程名。

第 34-49 行,定义一个加锁的线程函数,该函数会在进入时获取线程锁,之后再向集合添加线程名,添加完成后释放线程锁,sleep相应时间后,获取线程锁,移除线程名,同时显示集合(共有资源)内剩余的线程名,最后释放线程锁。

第 51-67 行,主函数中分别对加锁和不加锁的两种线程方式进行调用,并利用join()方法挂起线程以区分开两种方式的运行。

第 69-77 行,利用atexit.register函数/@register装饰器定义脚本退出函数。

最后输出结果

-----Below threads without lock-----
[Tue Aug 1 11:01:57 2017] Start Thread-1
[Tue Aug 1 11:01:57 2017] Start Thread-2[Tue Aug 1 11:01:57 2017] Start Thread-3
[Tue Aug 1 11:01:57 2017] Start Thread-4 [Tue Aug 1 11:01:59 2017] Completed Thread-3 (2 secs)
(remaining: Thread-1, Thread-4, Thread-2)
[Tue Aug 1 11:02:00 2017] Completed Thread-1 (3 secs)
(remaining: Thread-4, Thread-2)
[Tue Aug 1 11:02:01 2017] Completed Thread-2 (4 secs)[Tue Aug 1 11:02:01 2017] Completed Thread-4 (4 secs) (remaining: NONE) (remaining: NONE) -----Below threads with lock-----
[Tue Aug 1 11:02:01 2017] Start Thread-5
[Tue Aug 1 11:02:01 2017] Start Thread-6
[Tue Aug 1 11:02:01 2017] Start Thread-7
[Tue Aug 1 11:02:01 2017] Start Thread-8
[Tue Aug 1 11:02:03 2017] Completed Thread-7 (2 secs)
(remaining: Thread-8, Thread-6, Thread-5)
[Tue Aug 1 11:02:04 2017] Completed Thread-5 (3 secs)
(remaining: Thread-8, Thread-6)
[Tue Aug 1 11:02:05 2017] Completed Thread-6 (4 secs)
(remaining: Thread-8)
[Tue Aug 1 11:02:05 2017] Completed Thread-8 (4 secs)
(remaining: NONE)

从输出的结果中可以看到,不加线程锁时,可能出现print的错乱以及同时输出两个剩余线程为NONE的结果,而加了线程锁之后,线程对资源的使用则按照顺序进行。

锁的本质

为验证锁的本质作用并非通过锁定资源实现,利用代码创建两个线程,其中一个正常线程会遵循约定,在获取锁之后再对共有资源进行修改,而另一个异常线程则不会尝试获取锁,且在没有锁的权限基础上,直接对共有资源进行修改。

 from threading import Thread, Lock
import time
lock = Lock()
COUNT = 0 def gentleCounter():
name = 'gentleCounter'
lock.acquire()
print('%s acquired the lock' % name)
global COUNT
COUNT += 1
print('%s made a count plus, now the COUNT is %d' % (name, COUNT))
print('%s is taking a rest...' % name)
time.sleep(3)
COUNT += 1
print('%s made a count plus again, now the COUNT is %d' % (name, COUNT))
lock.release()
print('%s released the lock' % name) def wildCounter():
time.sleep(1)
name = 'wildCounter'
print('%s didn\'t acquire the lock' % name)
global COUNT
COUNT += 1
print('%s made a count plus, now the COUNT is %d' % (name, COUNT)) Thread(target=gentleCounter).start()
Thread(target=wildCounter).start()

第 1-4 行,导入线程和锁两个类,并定义一个锁对应的全局变量COUNT,赋予初值。

第 6-18 行,定义一个正常计数方法,该方法遵循规则,会在获取锁后对COUNT进行+1操作,随后休眠3秒再次进行COUNT+1操作,在这期间不会对锁释放,两次加值之后,正常计数方法会释放锁。

第 20-26 行,定义一个异常计数方法,该方法不会对锁进行获取,它会在进入时等待1秒,当正常计数方法的线程进入休眠状态时,不获取锁权限直接对COUNT进行+1操作。

第 28-29 行,将两种方法以两个线程分别启动。

最后得到结果

gentleCounter acquired the lock
gentleCounter made a count plus, now the COUNT is 1
gentleCounter is taking a rest...
wildCounter didn't acquire the lock
wildCounter made a count plus, now the COUNT is 2
gentleCounter made a count plus again, now the COUNT is 3
gentleCounter released the lock

从输出的结果中可以看到,wildCounter在gentleCounter休眠且锁未释放的期间,成功修改了共有资源,影响了gentleCounter的下一次计数。因此锁在物理上并未真正的对资源进行锁死,而是线程之间相互约定的一种修改权限。

互斥锁与可重入锁

Python threading模块有两类锁,互斥锁(threading.Lock )和可重用锁(threading.RLock)。两者的用法基本相同。但互斥锁只能被获取一次,若多次获取则会产生阻塞,需等待原锁释放后才能再次入锁。而可重入锁则可被本线程多次acquire入锁,但是要求入锁次数与释放次数相同,才能完全解锁,且锁的释放需要在同一个线程中进行。

 from threading import Lock, RLock

 def call():
print('This is call() function')
with lock:
g()
h() def g():
if not lock.acquire(True, 1):
print('g() acquires lock failed')
else:
print('This is g() function')
lock.release()
h() def h():
if not lock.acquire(True, 1):
print('h() acquires lock failed')
else:
print('This is h() function')
lock.release() print('\n-------Using Lock-------')
lock = Lock()
call()
print('\n-------Using RLock-------')
lock = RLock()
call()

第 3-22 行,定义call函数,在call函数中会利用with lock进行一次入锁,入锁后调用g()和h(),其中g和h函数的功能都是尝试对锁进行获取,获取失败或超时则输出失败语句,成功则显示函数调用并释放锁,而在g中会对h函数再进行一次调用。

第 24-29 行,运行程序,分别使用互斥锁和可重入锁进行试验。

输出结果

-------Using Lock-------
This is call() function
g() acquires lock failed
h() acquires lock failed
h() acquires lock failed -------Using RLock-------
This is call() function
This is g() function
This is h() function
This is h() function

最终程序输出中可以看出,对于互斥锁,在call函数获取锁之后,所有的函数都无法再次入锁,而对于可重入锁,其余函数可以再次入锁并且入锁与释放成对即可。

Note: 值得注意的是,对于互斥锁,当一个线程获取锁之后,可以在另一个线程中释放锁,而对于可重入锁,则必须在同一个线程中对锁的获取进行释放

4 死锁的产生

死锁出现在一个资源被多次调用,而调用方均未能释放资源,便会造成死锁现象。死锁大致可分为两种形式出现,迭代死锁和相互调用死锁。

4.1 迭代死锁

迭代死锁一般出现在同一个线程中对锁的多次获取,例如函数迭代嵌套,最终导致无法释放的现象。其本质依旧是acquire的阻塞,本线程进行了acquire时,其他线程或本线程再次acquire则会产生线程的阻塞等待,若是其他线程的acquire则可以等待至release,而若是本线程的二次acquire,则阻塞后无法调用release进行释放,因此产生了死锁现象。

 from threading import Thread, Lock, RLock, current_thread

 mutex = Lock()
reentrant = RLock()
class MyThread(Thread):
def __init__(self, lock):
Thread.__init__(self)
self.lock = lock def run(self):
self.name = current_thread().name
print('-------This is %s-------' % self.name)
if self.lock.acquire():
print('%s get lock one' % self.name, '\nTrying to get second lock')
self.lock.acquire()
print('Got second lock')
print('Trying to release lock...')
self.lock.release()
print('First lock released')
self.lock.release()
print('Second lock released')
print('Lock all released')
print('--------Exit %s---------' %
self.name) t = MyThread(reentrant)
t.start()
t.join()
t = MyThread(mutex)
t.start()
t.join()

在上面代码的run函数中,首先对传入的lock进行获取,在未释放的前提下,再次获取锁,若此时的锁为互斥锁,则会造成死锁现象。

运行的到结果

-------This is Thread-1-------
Thread-1 get lock one
Trying to get second lock
Got second lock
Trying to release lock...
First lock released
Second lock released
Lock all released
--------Exit Thread-1---------
-------This is Thread-2-------
Thread-2 get lock one
Trying to get second lock

从结果中可以看到,可重入锁完成了两次入锁与释放,而互斥锁则造成了死锁阻塞

4.2 互调死锁

互调死锁是在不同的线程中互相对对方已占有的资源进行了acquire,即两个线程均占有部分资源后,需要互相获取对方已占有资源锁时,都陷入阻塞等待中,从而造成了互调死锁的现象。

可以假设,有两把资源锁,一把A锁,一把B锁,当线程1执行顺序为获取A锁,获取B锁,释放B锁,释放A锁,线程2为获取A,获取B,释放B,释放A。当启动两个线程时,线程1获取到A锁,线程2获取到B锁,此时,线程1需要获取B锁,线程2需要获取A锁,而两个锁均被对方锁定,没有被释放,最终造成了互调死锁的现象。

 from threading import Thread, Lock, RLock, current_thread
import time mutex_1 = Lock()
mutex_2 = Lock()
reentrant_1 = RLock()
reentrant_2 = RLock()
COUNT = 0
class MyThread(Thread):
def __init__(self, lock_1, lock_2):
Thread.__init__(self)
self.lock_1 = lock_1
self.lock_2 = lock_2 def run(self):
self.name = current_thread().name
if self.lock_1.acquire():
print('%s got its first lock' % self.name)
global COUNT
COUNT += 1
print('%s make COUNT plus one, now COUNT is %d' % (self.name, COUNT))
time.sleep(2)
print('%s trying to get another one...' % self.name)
if self.lock_2.acquire():
print('%s got its second lock' % self.name)
self.lock_2.release()
print('%s release its second lock' % self.name)
self.lock_1.release()
print('%s release its first lock' % self.name) threads = [MyThread(mutex_1, mutex_2), MyThread(mutex_2, mutex_1)]
#threads = [MyThread(reentrant_1, reentrant_2), MyThread(reentrant_2, reentrant_1)]
for t in threads:
t.start()
for t in threads:
t.join()

第 8-29 行,在导入必须模块后,派生了一个线程子类,这个线程子类要求先后传入两把锁的实例,在run函数中,会对先传入的锁进行获取,获取之后将COUNT+1并休眠等待(以保证另一个线程有足够的时间获取它的第一把锁),随后尝试获取传入的第二把锁,获取成功后依次释放前面的两把锁。

第 31-36 行,此处对两个线程进行实例化,两个线程的区别在于,传入锁的顺序不同,因此线程1会先获取锁1,再获取锁2,而线程2则相反。传入的锁类型可选择互斥锁或可重入锁。

运行程序得到结果

Thread-1 got its first lock
Thread-1 make COUNT plus one, now COUNT is 1
Thread-1 trying to get another one...
Thread-2 got its first lock
Thread-2 make COUNT plus one, now COUNT is 2
Thread-2 trying to get another one...

查看程序运行结果可以发现,两个线程都在尝试获取对方手里的锁时,产生了阻塞,造成了互调死锁的现象。

Note: 值得注意的是,此处即便换成可重入锁,也不能解决互调死锁的问题,因为可重入锁仅对本线程支持可重入,对于其他线程依旧互斥。

5 锁的上下文管理

对于锁来说,同样支持上下文管理器with,下面使用with进行锁的管理

 import atexit
from random import randrange
import threading
from threading import Thread, Lock
from time import ctime, sleep class CleanOutputSet(set):
def __str__(self):
return ', '.join(x for x in self) # Use generator is much better than list
# loops = (randrange(1, 7) for x in range(2, 8))
loops = list(randrange(1, 7) for x in range(2, 8))
remaining = CleanOutputSet()
lock = Lock() def loop(nsec):
myname = threading.current_thread().name
with lock:
remaining.add(myname)
print('[{0}] Start {1}'.format(ctime(), myname))
# loops is generator, after 'for' iteration, remains []
if len(remaining) == len(loops):
func_for_trial()
sleep(nsec)
with lock:
remaining.remove(myname)
print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))
print(' (remaining: {0})'.format(remaining or 'NONE')) def func_for_trial():
count = threading.active_count()
active_thread = threading.enumerate()
print('There are %d active threads, \n%s' % (count, str(active_thread).replace(', ', '\n'))) def _main():
threads = []
for pause in loops:
threads.append(Thread(target=loop, args=(pause, )))
for t in threads:
t.start()
for t in threads:
t.join() @atexit.register
def _atexit():
print('All DONE at:', ctime()) if __name__ == '__main__':
_main()

信号量与有界信号量

对于信号量,是最古老的同步原语之一,是一个计数器,在资源消耗时递减,在资源释放时递增,可以认为信号量代表着资源的可用与否。信号量比锁更加灵活,可以处理当有多个线程,每个线程拥有有限资源的实例。

下面的例子模拟了一个有限资源的糖果机,使用信号量对资源进行跟踪。

 from atexit import register
from random import randrange
from threading import Semaphore, BoundedSemaphore, Lock, Thread
from time import sleep, ctime
import threading """
# This Obj reload the __len__ method to return current number of semaphore
class MySemaphore(BoundedSemaphore):
def __len__(self):
return self._value
candy = MySemaphore(5)
print(len(candy))
"""
lock = Lock()
MAX = 5 def refill():
lock.acquire()
print('Refilling candy...')
try:
candyTray.release()
except ValueError:
print('Full, skipping')
else:
print('OK, current candy num is %d' % candyTray._value)
lock.release() def buy():
lock.acquire()
print('Buying candy...')
if candyTray.acquire(False):
print('OK, current candy num is %d' % candyTray._value)
else:
print('Empty, skipping')
lock.release() def producer(loops):
for i in range(loops):
refill()
sleep(randrange(3)) def consumer(loops):
for i in range(loops):
buy()
sleep(randrange(3)) def _main():
print('Starting at', ctime())
nloops = randrange(2, 6)
print('THE CANDY MACHINE (full with %d bars)!' % MAX)
# Buyer Thread
buyer = Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2), ))
buyer.start()
# Vendor Thread
vendor = Thread(target=producer, args=(nloops, ))
vendor.start()
for t in [buyer, vendor]:
t.join() @register
def _atexit():
print('All DONE at:', ctime()) if __name__ == '__main__':
print('-------BoundedSemaphore-------')
candyTray = BoundedSemaphore(MAX)
_main()
print('-------Semaphore------')
candyTray = Semaphore(MAX)
_main()

第 7-14 行,在导入必须模块之后,派生的类重定义了一个__len__方法,调用len()方法后,会返回当前信号量的可用资源数。这样的方式使得查看资源的方式变得更加清晰。

第 15-16 行,初始化锁,设置初始信号量参数,用于模拟糖果机初始糖果数量,当使用有界信号量时,糖果数量不允许超过初始值,即糖果最多为5个。

第 18-27 行,定义一个refill函数,这个函数会在调用时进行加锁,然后释放一个信号量+1(模拟糖果机补充糖果),并在信号量将要超过初始值时捕获异常。

第 29-36 行,定义一个buy函数,这个函数会在调用时进行加锁,然后获取一个信号量-1(模拟用户购买一个糖果),并在信号量为0时处理阻塞。

第 38-46 行,定义生产者与购买者函数,分别以随机间隔时间执行糖果购买与糖果补充。

第 48-63 行,定义主函数,在主函数中启动购买者与糖果机线程,通过join挂起线程等待其余线程结束再退出主函数。

运行结果

-------BoundedSemaphore-------
Starting at Tue Aug 1 16:53:32 2017
THE CANDY MACHINE (full with 5 bars)!
Buying candy...
OK, current candy num is 4
Refilling candy...
OK, current candy num is 5
Refilling candy...
Full, skipping
Refilling candy...
Full, skipping
Buying candy...
OK, current candy num is 4
Buying candy...
OK, current candy num is 3
Refilling candy...
OK, current candy num is 4
Buying candy...
OK, current candy num is 3
Refilling candy...
OK, current candy num is 4
Buying candy...
OK, current candy num is 3
-------Semaphore------
Starting at Tue Aug 1 16:53:38 2017
THE CANDY MACHINE (full with 5 bars)!
Buying candy...
OK, current candy num is 4
Refilling candy...
OK, current candy num is 5
Buying candy...
OK, current candy num is 4
Refilling candy...
OK, current candy num is 5
Buying candy...
OK, current candy num is 4
Refilling candy...
OK, current candy num is 5
Refilling candy...
OK, current candy num is 6
Buying candy...
OK, current candy num is 5
Buying candy...
OK, current candy num is 4
Buying candy...
OK, current candy num is 3
Buying candy...
OK, current candy num is 2
Buying candy...
OK, current candy num is 1

查看最终输出结果可以看到,对于有界信号量模拟的糖果机来说,当信号量(糖果)大于初始值的时候,会抛出一个异常,此处改为输出糖果机已满。而对于非有界信号量,信号量(糖果)可以超过初始值,且不设上限。

相关阅读


1. 基本概念

2. threading 模块

3. 多线程的建立

参考链接


《Python 核心编程 第3版》

https://*.com/questions/22885775/what-is-the-difference-between-lock-and-rlock

http://www.jb51.net/article/74426.htm