最近在做一个东西的时候发现需要用到循环队列,实现先进先出(FIFO),不断往里面添加数据,当达到某个限定值时,最先进去的出去,然后再添加。之后需要对队列里面的内容进行一个筛选,作其他处理。首先我想到了python的Queue模块,先简单的介绍一下,具体的可以参考Queue。
一、Queue模块
Python queue模块有三种队列及构造函数:
1、Python queue模块的FIFO队列先进先出。 class queue.queue(maxsize)
2、LIFO类似于堆栈,即先进后出。 class queue.Lifoqueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.Priorityqueue(maxsize)
此包中的常用方法(q =queue.queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.get(block=True, timeout=None]) 从队列中返回并删除一个元素,timeout等待时间
q.get_nowait() 相当q.get(False)
q.put(item, block=True, timeout=None)非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
这里引入Queue模块就可以实现FIFO了,当我要提取队列里面的数据的时候,我得利用get()方法先把数据提取出来,然后存入一个新的数组,由于我要隔一段时间对里面的数据进行提取,而get()方法提取的时候会删除对应的元素,这样有点儿不方便。因此我自己写了一个类(写的不好的地方,大神们可以告诉我,不喜勿喷hh)
二、自定义一个类(circular_queue.py)
# 定义队列类
class MyQueue(object):
def __init__(self, size):
self.size = size # 定义队列长度
self.queue = [] # 存储队列 列表 def __str__(self):
# 返回对象的字符串表达式,方便查看
return str(self.queue) def inQueue(self, n):
# 入队
if self.isFull():
return -1
self.queue.append(n) # 列表末尾添加新的对象 def outQueue(self):
# 出队
if self.isEmpty():
return -1
firstelement = self.queue[0] # 删除队头元素
self.queue.remove(firstelement) # 删除队操作
return firstelement def delete(self, n):
# 删除某元素
element = self.queue[n]
self.queue.remove(element) def inPut(self, n, m):
# 插入某元素 n代表列表当前的第n位元素 m代表传入的值
self.queue[n] = m def getSize(self):
# 获取当前长度
return len(self.queue) def getnumber(self, n):
# 获取某个元素
element = self.queue[n]
return element def isEmpty(self):
# 判断是否为空
if len(self.queue) == 0:
return True
return False def isFull(self):
# 判断队列是否满
if len(self.queue) == self.size:
return True
return False
三、测试
在文件circular_queue.py中类的下头继续添加如下代码
queue = MyQueue(5) # 定义一个大小为5的队列
for i in range(8):
# 先判断队列是否为满
if not queue.isFull():
queue.inQueue(i)
else:
# 先出队再添加
queue.outQueue()
queue.inQueue(i)
print(queue)
运行结果如下
可以看出已经实现了,当然你也可以试试其他几种方法,比如提取元素,获取队列大小等等。接下来就可以像操作列表对元素进行提取,并且不会删除元素。
for i in range(queue.getSize()):
item =queue.getnumber(i)
print(item)
当然也可以把circular_queue。py文件单独保存,然后到其他文件中引入,放入同一文件夹下,新建test.py,然后运行试试:
from circular_queue import *
import time def fun2(num):
num += 1
return num def fun1(num, res):
while True:
num = fun2(num)
# 先判断队列是否为满
if not queue.isFull():
queue.inQueue([num, res])
else:
# 先出队再添加
queue.outQueue()
queue.inQueue([num, res])
print(queue)
time.sleep(2) if __name__ == "__main__":
queue = MyQueue(5)
fun1(0, '-aaa-')