Celery异步任务重复执行(Redis as broker)

时间:2021-10-29 14:43:13

之前讲到利用celery异步处理一些耗时或者耗资源的任务,但是近来分析数据的时候发现一个奇怪的现象,即是某些数据重复了,自然想到是异步任务重复执行了。

查阅之后发现,到如果一个任务太耗时,任务完成时间超过了broker的时间(Redis默认为一小时)则任务会被再次分配到worker.

Visibility Timeout

The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker. Be sure to see Caveats below.

This option is set via the broker_transport_options setting:

app.conf.broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

The default visibility timeout for Redis is 1 hour.

问题在于我的应用中的异步任务耗时绝不可能超过Redis默认的一小时,那么问题出在这个“Acknowledge”了,一开始我的理解是这个acknowledge是worker收到了broker发送的任务。但是通过查看workererr.log 发现:

[2019-03-01 14:20:30,695: INFO/MainProcess] Received task: task_async[4e0378e2-ff5d-4394-a842-ece2d1c8118a] ETA:[2019-03-01 15:44:03.692831+08:00]

[2019-03-01 15:23:58,477: INFO/MainProcess] Received task: task_async[4e0378e2-ff5d-4394-a842-ece2d1c8118a] ETA:[2019-03-01 15:44:03.692831+08:00]

[2019-03-01 15:44:04,620: INFO/ForkPoolWorker-2] Task task_async[4e0378e2-ff5d-4394-a842-ece2d1c8118a] succeeded in 0.003580662072636187s: None

[2019-03-01 15:44:04,621: INFO/ForkPoolWorker-1] Task task_async[4e0378e2-ff5d-4394-a842-ece2d1c8118a] succeeded in 0.004984764964319766s: None

1. 重复执行的任务被发送了多次 (时间间隔为1小时)

2. worker多次接收到同样的任务(同ID),并且几乎一样的ETA(预计执行时间)

3. 在ETA到达之后,这个任务会被多个子线程认领并执行,每次执行时间并不长

所以为什么14:20:30任务接收到之后15:23:58任务再次发送呢,问题在约“Acknowledge”(认领)并不是以“Received”为结束标志的,看celery对于acknowledge的解释:

acknowledged

Workers acknowledge messages to signify that a message has been handled. Failing to acknowledge a message will cause the message to be redelivered. Exactly when a transaction is considered a failure varies by transport. In AMQP the transaction fails when the connection/channel is closed (or lost), but in Redis/SQS the transaction times out after a configurable amount of time (the visibility_timeout).

所以说是以“Handled”来进行判定而非任务已被接收,所以会出现当我的定时任务在一小时后才执行的情况下,第一次发送的任务虽然接受了但是并未执行(Acknowledge),所以一小时后任务再次被发送。

解决这个问题的时候回看celery开篇教程中的一段:

Ideally task functions should be idempotent: meaning the function won’t cause unintended effects even if called multiple times with the same arguments. Since the worker cannot detect if your tasks are idempotent, the default behavior is to acknowledge the message in advance, just before it’s executed, so that a task invocation that already started is never executed again.

最佳实践中的任务应该是幂等的!

总结起来:

1. Task received的时候并不是acknowledge的时候,而task执行才是acknowledge (任务才会从broker队列中移除).

2. 我的任务都是定时任务(超过一小时),所以我设置visibility_time 超出我的定时,则重复执行不会再发生.

3. 如果任务很长或者跨度很长,如果对于只执行一次有严格要求,可以参考celery_once.

4. 还是要仔细阅读官方文档!!

Ref:

Scheduled tasks are being duplicated

https://github.com/cameronmaske/celery-once

http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#visibility-timeout

https://github.com/celery/django-celery/issues/176