Python 3.X 实现定时器 Timer,制作抽象的Timer定时器基类

时间:2022-07-02 01:21:49

Python 在不依赖第三方库的前提下,对于定时器的实现并不是很完美,但是这不意味着我们无法实现。

阅读了网上的一些资料,得出一些结论,顺手写了一个基类的定时器(Python3)

BaseTimer:

 # -*- coding: utf-8 -*-

 from abc import ABCMeta, abstractmethod
import time
import threading class BaseTimer(threading.Thread):
"""
基础定时器抽象类,可以随意继承本类并且实现exec抽象方法即可定时产生任务
"""
__metaclass__ = ABCMeta
def __init__(self,howtime=1.0,enduring=True):
"""
howtime 每次定时的时间
enduring 是否是一个持久性的任务,用这个可以控制开关
""" self.enduring = enduring
self.howtime = howtime
threading.Thread.__init__(self) def run(self):
time.sleep(self.howtime) #至少执行一次 任务
self.exec()
while self.enduring: #是否持久,或者是否进行运行
time.sleep(self.howtime)
self.exec() #每次开始执行任务 @abstractmethod
def exec(self):
"""抽象方法,子类实现"""
pass def destroy(self):
"""销毁自身"""
self.enduring = False
del self def stop(self):
self.enduring = False def restart(self):
self.enduring = True def get_status(self):
return self.enduring

如何使用?


我们来建立一个新的任务,这个任务是过一段时间就输出:

 class TimerMask(BaseTimer):
"""定时任务类,不同的业务逻辑"""
def __init__(self,howtime=1.0,enduring=True):
BaseTimer.__init__(self,howtime,enduring)
self.ws=0 def exec(self):
print("HelloWorld!")
self.ws = self.ws + 1 #这里是过0.8秒输出一次
if self.ws >5:
self.destroy()

加入如下:

 if __name__ == "__main__":
Q_w = 0
w = TimerMask(howtime=0.8)
print("-")
w.start()
#这里线程输出这些,做其他事情的
while True:
time.sleep(0.4) #0.4秒
print("- ",Q_w," - WMask:",w)
Q_w += 1
pass

输出:

Python 3.X 实现定时器 Timer,制作抽象的Timer定时器基类


于是你可能会想问,那岂不是每个不同的行为都要写一个继承了BaseTimer的类来做事呢,其实不然,你可以写个函数调用的TimerFunc类:

 class TimerFunc(BaseTimer):
"""可传递任何函数的定时任务类"""
def __init__(self,func,howtime=1.0,enduring=True):
BaseTimer.__init__(self,howtime,enduring)
self.func = func def exec(self):
self.func() #调用函数 #使用方法:
def doing():
print("Hello") w = TimerFunc(doing)
w.start()

输出:"Hello",并且会每隔1秒执行一次

是不是觉得可以做一堆事情了?你可以*发挥,继承BaseTimer类

 w = TimerFunc(doing,5,False) #这样就可以定义延迟5秒使用了~
w.start()

在搜索资料的时候,找到了网上大部分的实现方法,其实都差不多,感兴趣你可以看看:

 import threading ,time
from time import sleep, ctime
class Timer(threading.Thread):
"""
very simple but useless timer.
"""
def __init__(self, seconds):
self.runTime = seconds
threading.Thread.__init__(self)
def run(self):
time.sleep(self.runTime)
print ("Buzzzz!! Time's up!") class CountDownTimer(Timer):
"""
a timer that can counts down the seconds.
"""
def run(self):
counter = self.runTime
for sec in range(self.runTime):
print (counter)
time.sleep(1.0)
counter -= 1
print ("Done") class CountDownExec(CountDownTimer):
"""
a timer that execute an action at the end of the timer run.
"""
def __init__(self, seconds, action, args=[]):
self.args = args
self.action = action
CountDownTimer.__init__(self, seconds)
def run(self):
CountDownTimer.run(self)
self.action(self.args) def myAction(args=[]):
print ("Performing my action with args:")
print (args) if __name__ == "__main__":
t = CountDownExec(3, myAction, ["hello", "world"])
t.start()
print("")
 '''
使用sched模块实现的timer,sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
Created on 2013-7-31 @author: Eric
'''
import time, sched #被调度触发的函数
def event_func(msg):
print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg) def run_function():
#初始化sched模块的scheduler类
s = sched.scheduler(time.time, time.sleep)
#设置一个调度,因为time.sleep()的时间是一秒,所以timer的间隔时间就是sleep的时间,加上enter的第一个参数
s.enter(0, 2, event_func, ("Timer event.",))
s.run() def timer1():
while True:
#sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
time.sleep(1)
run_function() if __name__ == "__main__":
timer1()

感谢耐心阅读,希望对你有帮助。