源码之Queue

时间:2024-04-21 09:06:58

看源码可以把python看得更透,更懂,想必也是开发人员的必经之路。

现在有个任务,写个线程池。使用Queue就能写一个最简单的,下面就来学学Queue源码。

源码之Queue

class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= , the queue size is infinite.

告诉你创建一个给出长度的队列,如果长度不大于0那么队列长度将变成无限。

Queue构造方法

 def __init__(self, maxsize=0):
self.maxsize = maxsize #定义maxsize字段为最大值字段
self._init(maxsize) #调用_init方法 # mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning.
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
'''
当队列正在改变时,锁必须被持有,所有获得锁的方法必须在返回之前释放它。
锁在三种条件下被共享,所以获取和释放条件也就获取和释放锁。
''' self.mutex = _threading.Lock() #定义mutex字段为一个锁对象 # Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
'''
当有一项被加入进队列时通知非空,然后通知一个线程将被等待得到
''' self.not_empty = _threading.Condition(self.mutex) #返回一个Condition对象 # Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
'''
当一项被移除出队列时通知未满,然后通知一个线程等待被放进队列
''' self.not_full = _threading.Condition(self.mutex) #返回一个Condition对象 # Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
'''
在未完成任务的数量被删除至0时,通知所有任务完成
''' self.all_tasks_done = _threading.Condition(self.mutex) #返回一个Condition对象
self.unfinished_tasks = 0 #定义未完成任务数量

解析:

  将maxsize参数传递给了_init()方法,后面再看这个方法,它其实是创建了一个deque对象(双管道)。

之后创建了一个锁对象,又通过锁对象创建了3个Condition对象。关于Condition对象,它属于线程的领域,后面介绍。

类Queue中的方法:

1.task_done

 def task_done(self):
"""Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete. If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue). Raises a ValueError if called more times than there were items
placed in the queue.
表明一个以前的队列任务完成了 使用队列消费者进程。对于每一个get()用来得到任务,随后调用task_done 方法告诉队列任务的处理已经完成了
如果一个join正在阻塞,当所有项都已经被加工了他将重新占用。
如果调用次数超过队列中放置的项目,则会抛ValueError异常
"""
self.all_tasks_done.acquire() #获得锁
try:
unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成
if unfinished <= 0: #是则进行通知,或在过量调用时报异常
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished #否则未完成任务数量-1
finally:
self.all_tasks_done.release() #最后释放锁

解析:

  这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。nodify这个方法在Condition对象中研究。

2.join

 def join(self):
"""Blocks until all items in the Queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. 阻塞,直到队列中所有项都被得到和处理 当一个项目被添加到队列中时,未完成任务上升。当一个消费线程调用task_done方法表明这项被恢复且所有工作都完成时数量下降。
当未完成为0时,join解除阻塞
""" self.all_tasks_done.acquire()
try:
while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()

解析:

  阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。wait方法后面看。

3.qsize

 def qsize(self):
"""Return the approximate size of the queue (not reliable!).
返回一个估计的队列大小,不可靠
"""
self.mutex.acquire()
n = self._qsize() #这个方法返回了deque对象的长度
self.mutex.release()
return n

解析:

  后面会提_qsize这个方法,它返回了deque对象的长度。这个一个估计值,并不可靠。

4.empty

 def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!).
当队列为空时返回True,否则False
"""
self.mutex.acquire()
n = not self._qsize() #如果长度为0返回True,否则False
self.mutex.release()
return n

解析:

  判断队列长度是否为空,也是基于qsize的方法,所以仍然不可靠。

5.full

 def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
self.mutex.acquire()
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n

没啥说的了,判断队列是否满了

6.put

 def put(self, item, block=True, timeout=None):
"""Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case). 如果可选的参数block和timeout为空(默认),如果必要的话阻塞直到有一个空闲位置可用。
如果timeout是一个非负的数字,它将阻塞至多这个数字的秒数并且如果没有可用位置时报Full异常。
另外,block 为false时,如果有可用的位置将会放一项进去,否则报Full异常 """
self.not_full.acquire() #not_full获得锁
try:
if self.maxsize > 0: #如果队列长度有限制
if not block: #如果没阻塞
if self._qsize() == self.maxsize: #如果队列满了抛异常
raise Full
elif timeout is None: #有阻塞且超时为空,等待
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0: #到时后,抛异常
raise Full
self.not_full.wait(remaining)
self._put(item) #调用_put方法
self.unfinished_tasks += 1 #未完成任务+1
self.not_empty.notify() #通知非空
finally:
self.not_full.release() #not_full释放锁

解析:

  默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。

如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。

7.put_nowait

 def put_nowait(self, item):
"""Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, False)

就是put方法的block设置成Fasle的效果,没啥说的。

8.get

 def get(self, block=True, timeout=None):
"""Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire() #not_empty获得锁
try:
if not block: #不阻塞时
if not self._qsize(): #队列为空时抛异常
raise Empty
elif timeout is None: #不限时时,队列为空则会等待
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get() #调用_get方法,移除并获得项目
self.not_full.notify() #通知非满
return item #返回项目
finally:
self.not_empty.release() #释放锁

解析:

  可以看出逻辑同put类似,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。

9.get_nowait

block=False版的get方法

10._init()

def _init(self, maxsize):
self.queue = deque()

生成了一个deque对象,这个deque对象才是真正操作队列添加或者删除一项的源头,所以有必要读一下这个类。

源码之deque

class deque(object):
"""
deque([iterable[, maxlen]]) --> deque object Build an ordered collection with optimized access from its endpoints.
"""

告诉你这个类是创建一个能优化访问它的端点的有序的集合

deque构造方法:

 def __init__(self, iterable=(), maxlen=None): # known case of _collections.deque.__init__
"""
deque([iterable[, maxlen]]) --> deque object Build an ordered collection with optimized access from its endpoints.
# (copied from class doc)
"""
pass

构造函数中有两个参数,迭代器和一个最大长度,默认都为空。

类Queue/deque中的方法:

类中方法有很多,先挑Queue中遇到的看。

11._qsize

def _qsize(self, len=len):
return len(self.queue)

这个方法直接得到deque对象的长度并返回

12._put

def _put(self, item):
self.queue.append(item)

这个方法调用了deque的append方法:

def append(self, *args, **kwargs): # real signature unknown
""" Add an element to the right side of the deque. """
pass

在deque队列右边添加一个元素。注意,这个方法只有一句pass,并没有实现其功能的代码。但是看到 real signature unknown(真正未知的签名)这句话,我觉得就是把代码放在我看不见的地方了。

13._get

def _get(self):
return self.queue.popleft()

这个方法调用了deque的popleft方法:

def popleft(self, *args, **kwargs): # real signature unknown
""" Remove and return the leftmost element. """
pass

从最左端删除并返回一个元素,这有点像序列中的pop方法。同样的,也是一句 real signayure unknown

到此Queue类的源码已经看完了,不过现在对Queue的原理只了解了一部分,接下来看看锁和Condition对象在队列中是如何工作的。

源码之Condition

class _Condition(_Verbose):
"""Condition variables allow one or more threads to wait until they are
notified by another thread.
"""

这个类继承了Vserbose类,还告诉你条件变量允许一个或多个线程等待直到他们被其他线程通知

Condition构造方法:

 def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose) #调用了父类的构造方法
if lock is None:
lock = RLock() #获得一个RLock对象
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire #两个字段分别引用锁的获得和释放
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self.__waiters = []

这个方法中执行了父类的构造方法,然后获得了RLock对象,又将其方法的引用赋给了多个字段。来看Verbose的构造方法:

class _Verbose(object):
def __init__(self, verbose=None):
pass

什么也没做,下面来看RLock。

源码之RLock

class _RLock(_Verbose):
"""A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it
again without blocking; the thread must release it once for each time it
has acquired it.
"""

说什么一个进程得到一个锁后必须释放它

类RLock构造方法

def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__block = _allocate_lock()
self.__owner = None
self.__count = 0

又调用了Vserbose的__init__(),同样是pass。还调用了_allocate_lock()方法。

def allocate_lock(): # real signature unknown; restored from __doc__
"""
allocate_lock() -> lock object
(allocate() is an obsolete synonym) Create a new lock object. See help(LockType) for information about locks.
"""
pass

又是real signature unknown,看注释:创建一个新的锁对象。它就干了这个。

还有两个字段,所有者和数量。接下来就看Condition中用到的RlLock的3个的方法。

1._release_save

def _release_save(self):
if __debug__:
self._note("%s._release_save()", self)
count = self.__count
self.__count = 0
owner = self.__owner
self.__owner = None
self.__block.release()
return (count, owner)

字面意思为保存,释放。如果debug为真,将本方法的返回值传递给了note方法。然后将owner和count字段保存,返回,重置。那么debug为何,note方法又做了什么?

__debug__在__builtin__.py中,默认为True。来看note方法

 _VERBOSE = False

 if __debug__:

     class _Verbose(object):

         def __init__(self, verbose=None):
if verbose is None:
verbose = _VERBOSE
self.__verbose = verbose def _note(self, format, *args):
if self.__verbose:
format = format % args
# Issue #4188: calling current_thread() can incur an infinite
# recursion if it has to create a DummyThread on the fly.
ident = _get_ident()
try:
name = _active[ident].name
except KeyError:
name = "<OS thread %d>" % ident
format = "%s: %s\n" % (name, format)
_sys.stderr.write(format)

__init__中将self._verbose赋值成了False,而在_note中如果self._verbose为假那么这个方法就啥也没执行。

如果self._verbose为真,就会调用了_get_ident()方法,简单看了下这个方法,解释为:返回一个非0整数,在其他同时存在的线程中唯一标识 一个进程。

所以这个方法干的活就是返回锁的拥有者,数量,然后释放锁。

2._acquire_restore

 def _acquire_restore(self, count_owner):
count, owner = count_owner
self.__block.acquire()
self.__count = count
self.__owner = owner
if __debug__:
self._note("%s._acquire_restore()", self)

顾名思义,获得锁,然后将owner和count作为参数赋值给对象,恢复字段的值。

3._is_owned

def _is_owned(self):
return self.__owner == _get_ident()

判断owner的值与get_ident的返回值是否相等。其返回值就是唯一标识线程的一个整数,那么这个比较的意义就是确定这个锁对象的持有者是否为这个进程。

类Condition的方法

condition中主要用到了的wait,nodify方法,让其他线程等待或唤醒它们。由于时间有限,这部分内容待续。

至此,Queue源码已经看的差不多了,总结一下

Queue调用RLock对象使线程获得和释放锁,并记录线程的信息。调用Condition对象控制线程等待或是唤醒它们。然后deque对象负责操作管道中的元素。

心得

刚开始读源码时方法和类间调来调去有点难以下手,不过多读几遍就差不多搞明白了。我的思路是先攻主体,比如就先看Queue里的代码,遇到了什么deque,Condition的可以先跳过,之后再逐个击破。最后回过头来把各个部分组合整理一下,再画个类图什么的就更容易明白了。

最后附上一张类图。

源码之Queue