celery --分布式任务队列

时间:2021-11-11 07:54:43

一、介绍

celery是一个基于python开发的分布式异步消息任务队列,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。如果你的业务场景中需要用到异步任务,就可以考虑使用celery

二、实例场景

1、你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2、你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

三、优点

  • 1、简单:一但熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 2、高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 3、快速:一个单进程的celery每分钟可处理上百万个任务
  • 4、灵活:几乎celery的各个组件都可以被扩展及自定制

四、入门

celery 需要一个解决方案来发送和接受消息,通常,这是以称为消息代理的单独服务的形式出现的
有以下几种解决方案,包括:
一:RabbitMQ(消息队列,一种程序之间的通信方式)
rabbitmq 功能齐全,稳定,耐用且易于安装。它是生产环境的绝佳选择。
如果您正在使用Ubuntu或Debian,请执行以下命令安装RabbitMQ:

$ sudo apt-get install rabbitmq-server

命令完成后,代理已经在后台运行,准备为您移动消息:。Starting rabbitmq-server: SUCCESS
二、redis

redis功能齐全,但在突然中止或者电源故障时更容易丢失数据

五、安装

$ pip install celery 

六、应用

创建一个tasks.py文件

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y

第一个参数Celery是当前模块的名称。只有在__main__模块中定义任务时才能自动生成名称。
第二个参数是broker关键字参数,指定要使用的消息代理的URL。这里使用RabbitMQ(也是默认选项)。
您可以使用RabbitMQ amqp://localhost,或者您可以使用Redis redis://localhost。
您定义了一个名为add的任务,返回两个数字的总和。

 from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings')
app = Celery('saruman_server') # Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

和django配合实例

七、运行celery工作服务器

您现在可以通过使用worker 参数执行我们的程序来运行worker :

celery -A tasks worker --loglevel=info

有关可用命令行选项的完整列表,请执行以下操作:

$ celery worker --help

还有其他几个可用的命令,也可以提供帮助:

$ celery help

八、调用任务

要调用我们的任务,您可以使用该delay()方法。
apply_async() 可以更好地控制任务执行

>>> from tasks import add
>>> add.delay(4, 4)

调用任务会返回一个AsyncResult实例。这可用于检查任务的状态,等待任务完成,或获取其返回值(或者如果任务失败,则获取异常和回溯)。

九、保持结果

如果您想跟踪任务的状态,Celery需要在某处存储或发送状态。有几个内置的结果后端可供选择:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定义自己的。
在本例中,我们使用rpc结果后端,它将状态作为瞬态消息发回。后端通过backend参数 指定Celery

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用Redis作为结果后端,但仍然使用RabbitMQ作为消息代理(一种流行的组合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

现在配置了结果后端,让我们再次调用该任务。这次你将保持AsyncResult调用任务时返回的实例:

>>> result = add.delay(4, 4)

该ready()方法返回任务是否已完成处理:

>>> result.ready()
False 

十、配置

与消费类电器一样,celery不需要太多配置即可运行。它有一个输入和一个输出。输入必须连接代理,输出可以
选择到结果后端。
可以直接在应用程序上或使用专用配置模块设置配置。例如,您可以通过更改task_serializer设置来配置用于序列化任务有效负载的默认序列化程序:

app.conf.task_serializer = 'json'

如果您一次配置了许多设置,则可以使用update:

app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

对于大型项目,建议使用专用配置模块。不鼓励硬编码周期性任务间隔和任务路由选项。将它们保存在集中位置要好得多。对于库来说尤其如此,因为它使用户能够控制其任务的行为方式。集中配置还允许您的SysAdmin在发生系统故障时进行简单的更改。
您可以通过调用app.config_from_object()方法告诉Celery实例使用配置模块:

app.config_from_object('celeryconfig')

此模块通常称为“ celeryconfig”,但您可以使用任何模块名称。
在上面的例子中,一个名为的模块celeryconfig.py必须可以从当前目录或Python路径加载。它可能看起来像这样:
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://' task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
 from datetime import timedelta

 import djcelery

 djcelery.setup_loader()
BROKER_URL = 'amqp://guest@localhost//' #输入
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//' #返回的结果 #导入指定的任务模块
CELERY_IMPORTS = (
'fir.app.fir.tasks',
) CELERYBEAT_SCHEDULE = {
'receive_mail': {
"task": "fir.app.fir.tasks.receive_mail",
"schedule": timedelta(seconds=5),
"args": (),
},
}

要验证配置文件是否正常工作且不包含任何语法错误,您可以尝试导入它:
####################################################

python -m celeryconfig

为了演示配置文件的强大功能,您可以将行为不当的任务路由到专用队列:

celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是可以对任务进行速率限制,这样在一分钟(10 / m)内只能处理10种此类任务:

celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
如果您使用RabbitMQ或Redis作为代理,那么您还可以指示工作人员在运行时为任务设置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully

十一、在项目中如何使用celery

1、可以把celery配置成一个应用
2、目录结构如下:

proj/__init__.py
/celery.py
/tasks.py

3、proj/celery.py内容

from __future__ import absolute_import, unicode_literals
from celery import Celery app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks']) # Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
) if __name__ == '__main__':
app.start()

4、proj/tasks.py中的内容

from __future__ import absolute_import, unicode_literals
from .celery import app @app.task
def add(x, y):
return x + y @app.task
def mul(x, y):
return x * y @app.task
def xsum(numbers):
return sum(numbers)

5、启动worker

$ celery -A proj worker -l info

输出

-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x103a020f0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

django 中使用celery:参考链接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django

十二、监控工具flower

如果有些任务出现问题,可以用flower工具监控(基于tornado)
安装:pip install flower

使用:
三种启动方式

celery flower
celery flower --broker
python manage.py celery flower #就能读取到配置里的broker_url 默认是rabbitmq

打开运行后的链接
打开worker
python manage.py celery worker -l info

celery --分布式任务队列的更多相关文章

  1. Celery 分布式任务队列快速入门

    Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...

  2. day21 git & github + Celery 分布式任务队列

    参考博客: git & github 快速入门http://www.cnblogs.com/alex3714/articles/5930846.html git@github.com:liyo ...

  3. Celery 分布式任务队列快速入门 以及在Django中动态添加定时任务

    Celery 分布式任务队列快速入门 以及在Django中动态添加定时任务 转自 金角大王 http://www.cnblogs.com/alex3714/articles/6351797.html ...

  4. 【转】Celery 分布式任务队列快速入门

    Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 分布式 Celery 定时任务 与django结合 通过 ...

  5. Celery -- 分布式任务队列 及实例

    Celery 使用场景及实例 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery perio ...

  6. Celery 分布式任务队列入门

    一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery ...

  7. celery分布式任务队列的使用

    一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery ...

  8. Celery分布式任务队列快速入门

    本节内容 1. Celery介绍和基本使用 2. 项目中使用Celery 3. Celery定时任务 4. Celery与Django结合 5. Django中使用计划任务 一  Celery介绍和基 ...

  9. 分布式任务队列 Celery —— Task对象

    转载至 JmilkFan_范桂飓:http://blog.csdn.net/jmilk  目录 目录 前文列表 前言 Task 的实例化 任务的名字 任务的绑定 任务的重试 任务的请求上下文 任务的继 ...

随机推荐

  1. phpexcel读取excel的xls xlsx csv格式

    我之前写过一篇PHP读取csv文件的内容 上代码index.php <?php /** * * @author XC * */ class Excel { public $currentShee ...

  2. &lbrack;译&rsqb; Paxos算法详解

    1. 概述 Paxos算法被用来实现一个容错的分布式系统,一直以来以晦涩难懂著称.这可能是因为该算法最开始使用希腊文表述的.事实上,它是所有分布式算法中最简单易懂的.Paxos算法的本质其实就是一个共 ...

  3. JSon 对象转字符的一些方法

    引用System.Web.Entity.dll public static string ToJSON(this object obj) { JavaScriptSerializer serializ ...

  4. WebForm之Linq组合查询

    组合查询 protected void Button1_Click(object sender, EventArgs e) { //默认查询所有,返回的是Table类型,转换成IQueryAble类型 ...

  5. 2016&period;6&period;11 ASP提交数据到SQL server数据乱码解决方法

    1.检查数据库排序规则 China-PRE-90-CS-AI 2.ASP文档中,写入数据的页面的编码和检查提交数据页面的编码一致:

  6. 【HDU 5030】Rabbit&&num;39&semi;s String &lpar;二分&plus;后缀数组)

    Rabbit's String Problem Description Long long ago, there lived a lot of rabbits in the forest. One d ...

  7. spring data jpa 组合条件查询封装

    /** * 定义一个查询条件容器 * @author lee * * @param <T> */ public class Criteria<T> implements Spe ...

  8. python-web自动化-元素定位

    # -*- coding:utf-8 -*- from selenium import webdriver from selenium.webdriver.common.by import By # ...

  9. 2018上IEC计算机高级语言&lpar;C&rpar;作业 第1次作业

    1.经过这几周的学习,总结一下学习的心得与体会.(不少于100字:10分) 学习c语言已经一个学期了,刚开始学习的时候老是感觉力不从心.虽然认真听课了, 但是并不能理解它.这种情况到了后来才有所改变. ...

  10. &lbrack;java初探09&rsqb;&lowbar;&lowbar;关于java的包装类

    前言 在Java语言的学习过程中,我们逐渐的理解了Java面向对象的思想,与类和对象的应用.但是在基本数据类型的使用上,我们无法将其定义为一个对象,通过使用对象的方法来使用它们,但是Java语言的思想 ...