Flask 与 Celery 在 windows 下的集成问题

时间:2023-03-08 20:12:01
Flask 与 Celery 在 windows 下的集成问题

Flask 与 Celery 在 windows 下的集成问题

所有的 Web 框架内部的视图中不适合执行需要长时间运行的任务,包括 Flask 、Django 等。这类型的任务会阻塞 Web 的响应,导致用户在等待执行结果,对用户不友好。发送邮件通知、数据统计等任务,执行时间长,应该自动或由用户触发,然后在后台执行。

后台执行的方式有多种,多线程、多进程都可以。但要注意,一般不要直接在 Web 框架中触发多线程任务,因为无法确定 web 服务器会否进行回收线程导致任务中止,不够可靠。可以通过独立的服务进程,自己写线程任务或多进程任务完成这些操作。更加可靠和方便的方法是使用 Celery 和消息队列的方式,让 Celery 来管理相关的任务,同时也便于监控。

Flask 的在线文档有一个简单的方法集成 Celery 和 Flask Celery Based Background Tasks 。这个方法在 Unix-link 系统中工作正常,在 windows 上以工厂模式工作时,会因为 windows 的进程创建机制问题工作异常,包括像 Flask-CeleryExt / Flask-Celery-Helper 都有点问题,Celery 的 worker 子进程无法加载 Flask 的 App Context 。

这里提供一个各个平台都可行的方法,假设布局如下:

  1. app.py

    这个文件以工厂模式创建 Flask App

     from flask import Flask
    from flask.ext.sqlalchmey import SQLAlchemy db = SQLAlchemy() def create_app(config):
    app = Flask(__name__)
    app.config.from_object(config) db.init_app(app) # 加载 blueprint return app
  2. tasks.py

    这个文件定义 celery 任务

     from celery import current_app as current_celery
    from app import db @current_celery.task()
    def say_hello(name):
    # 数据库操作,db.session...
    return 'Hello, %s!' % name
  3. config.py

    配置文件

     class config(object):
    # ...
    CELERY_BROKER_URL = 'sqla+sqlite:///celery-broker.sqlite'
    CELERY_RESULT_BACKEND = 'db+sqlite:///celery-result.sqlite'
    CELERY_ALWAYS_EAGER = True
  4. manage.py

    这个文件作为程序入口

     from flask.ext.script import Manager
    from app import create_app
    from celery import Celery, Task
    from config import config app = create_app(config)
    manager = Manager(app) class AppContextTask(Task):
    abstract = True def __call__(self, *args, **kwargs):
    if config.CELERY_ALWAYS_EAGER:
    return super(BoundTask, self).__call__(*args, **kwargs)
    else:
    with app.app_context():
    return super(BoundTask, self).__call__(*args, **kwargs) celery = Celery(app.import_name, broker=config.CELERY_BROKER_URL),
    set_as_current=True, task_cls= AppContextTask)
    celery.conf.update(app.config) # ... 其它 if __name__ == '__main__':
    manager.run()

在调试时,可把 CELERY_ALWAYS_EAGER 设置为 True ,那么所有的调用会变为实时调用,不通过 Broker 进行,因此在 AppContextTask 中,就不需要重新注入 App Context 。

这种方法在 windows 、linux 和 mac 上已测试过可行,但是要注意,Flask 的 app 和 AppContextTask 必须在同一个文件一起创建,我尝试过以 Flask 扩展中 init_app 的方式处理时,发现新生成的 Celery 对象和原始 Celery 对象完全不同,无法保留原始 Flask app 的信息,不得不以这种方式处理。