apscheduler -定时任务

时间:2023-03-08 16:26:04

https://apscheduler.readthedocs.io/en/latest/userguide.html

简单的使用方式为:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

sched.add_job(ff_task,'cron',hour='0-1,8-23',minute=28)
sched.add_job(avor_task, 'cron', hour='2-7', minute='0')
sched.add_job(vor_task,'cron',hour='0-1,8-23',minute='*/3')
try:
    sched.start()
except (KeyboardInterrupt, SystemExit):
    sched.shutdown()

apscheduler包含四个组件 :triggers触发器 , schedulers 调度器 ,job stores任务存储 , executors执行器

triggers 包含计划的逻辑,每个任务都有自己的触发器,来决定任务下次被触发的时间,除了类型的初始配置,触发器是完全无状态的

job stores 用户存储被计划的任务,默认存储在内存中,也可存储在各种数据库中。当一个任务被存储在持久化的存储库中时将被序列化,然后在加载到内存时被反序列化。

调度器之间不应该共用任务存储。

executors 用于处理任务的执行。 让预定的任务在线程或进程池中启动,当任务结束后,执行器通知调度器什么时候进行合适的事件。

scheduler用于将所有的组建凝结起来。一个应用中通常只有一个调度器

BlockingScheduler:用于当scheduler是进程里唯一运行的程序

BlockgroundScheduler:当你没有使用以下任何一个调度器,并且希望scheduler在应用的后台运行时适用

AsyncIOScheduler :当应用中使用 asyncio module(异步io模块)时适用

GeventScheduler :当应用中使用 gevent时适用

TornadoScheduler :当构建基于Tornado的应用时适用

TwistedScheduler: 当构建基于 TwistedScheduler的应用时适用

QtScheduler: 当构建 Qt 应用时适用

存储器选择

如果每次开启应用时都会重建任务计划,则可以使用默认的内存存储

如果你希望当scheduler重启或应用宕机时,任务可以继续的按计划执行 ,存储器的选择通常取决于开发环境中使用的工具。

如果你没什么要求则推荐使用SQLAlchemyJobStore,以PostgreSQL作为存储后端

执行器

通常取决于上述组建的选择,不过,通常默认的执行器 ThreadPoolExecutor也足够大多数任务了。如果任务中设计了cpu密集型操作,应该考虑ProcessPoolExecutor ,来使用多核CPU。

你甚至可以同时使用这两个执行器,将进程池执行器添加为备选执行器

触发器

trigger决定任务执行时间的逻辑。ASPcheduler内置三种触发器:

date : 一次性的固定时间点执行任务

interval: 按周期循环执行任务

cron: 语法类似linux的定时任务cron

要组合多种triggers,也可实现 ,combining triggers ,见 https://apscheduler.readthedocs.io/en/latest/modules/triggers/combining.html#module-apscheduler.triggers.combining

ASPchedulre提供多种配置方式 ,可以通过一个配置的dict或者作为可选关键字参数传入。或者也可先实例化scheduler,添加任务,然后再配置scheduler,这种方式取得最大的灵活性。

完整的而配置说明见 https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler

例如,选用BackgroundScheduler ,使用默认的job store 和默认的executor:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler=BackgroundScheduler()

这将会用名为”default"的MemoryJobStore ,名为“default"的ThreadPoolExecutor,默认线程池最大量为10的配置创建一个 BackgroundScheduler 。

例1

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor

jobstores={

'mongo':MongoDBJobStore(),

'default':SQLAlchemyJobStore(url='sqlite://jobs.sqlite')

}

executors={

'default':ThreadPoolExecutor(20),

'processpool':ProcessPoolExector(5)

}

job_defaults={

'coalesce':False,

'max_instances';3

}

scheduler=BackgroundScheduler(jonstores=jobstores,executors=executors,job_defaults,timezone=utc)

例2

from apsshceduler.schedulers.background import BackgroundScheduler

scheduler=BackgroundScheduler({

'apscheduler.jonstores.mongo':{

  'type':'mongodb'

  },

'apscheduler.jobstores.default':{

  'type':'sqlalchemy',

  'url':'sqlite:///jobs.sqlite'

  },

'apscheduler.executors.default':{

  'class':'apscheduler.executors.pool:ThreadExecutor',

  'max_workers':'20'

  },

'apscheduler.executors.processpool':{

  'type':'processpool',

  'max_workers':'5'

  },

'apscheduler.job_defaults.coalesce':'false',

'apscheduler.job_defaults.max_instances':'3',

'apscheduler.timezone':'UTC',

  }

)

例3

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ProcessPoolExecutor

jobstores={

'mongo':{'type':'mongodb'},

'default':SQLAlchemyJobStore(url='sqlite://jobs.sqlite')

}

executors={

'default':{'type':'threadpool','max_workers':20},

'processpool':ProcessPoolExecutor(max_workers=5)

}

job_defaults={

'coalesce':False,

'max_instances':3

}

scheduler=BackgroundScheduler()

scheduler,configure(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)