django / celery:在150k Django对象上运行任务的最佳实践?

时间:2022-02-02 20:29:30

I have to run tasks on approximately 150k Django objects. What is the best way to do this? I am using the Django ORM as the Broker. The database backend is MySQL and chokes and dies during the task.delay() of all the tasks. Related, I was also wanting to kick this off from the submission of a form, but the resulting request produced a very long response time that timed out.

我必须在大约150k Django对象上运行任务。做这个的最好方式是什么?我使用Django ORM作为经纪人。数据库后端是MySQL,并在所有任务的task.delay()期间窒息而死。相关的,我也想从提交表单中解决这个问题,但是由此产生的请求产生了很长的响应时间。

3 个解决方案

#1


10  

I would also consider using something other than using the database as the "broker". It really isn't suitable for this kind of work.

我还会考虑使用除了使用数据库之外的其他东西作为“经纪人”。它真的不适合这种工作。

Though, you can move some of this overhead out of the request/response cycle by launching a task to create the other tasks:

但是,您可以通过启动任务来创建其他任务,从而将部分开销从请求/响应周期中移除:

from celery.task import TaskSet, task

from myapp.models import MyModel

@task
def process_object(pk):
    obj = MyModel.objects.get(pk)
    # do something with obj

@task
def process_lots_of_items(ids_to_process):
    return TaskSet(process_object.subtask((id, ))
                       for id in ids_to_process).apply_async()

Also, since you probably don't have 15000 processors to process all of these objects in parallel, you could split the objects in chunks of say 100's or 1000's:

此外,由于您可能没有15000个处理器来并行处理所有这些对象,因此您可以将对象拆分为100或1000的块:

from itertools import islice
from celery.task import TaskSet, task
from myapp.models import MyModel

def chunks(it, n):
    for first in it:
        yield [first] + list(islice(it, n - 1))

@task
def process_chunk(pks):
    objs = MyModel.objects.filter(pk__in=pks)
    for obj in objs:
        # do something with obj

@task
def process_lots_of_items(ids_to_process):
    return TaskSet(process_chunk.subtask((chunk, ))
                       for chunk in chunks(iter(ids_to_process),
                                           1000)).apply_async()

#2


2  

Try using RabbitMQ instead.

尝试使用RabbitMQ。

RabbitMQ is used in a lot of bigger companies and people really rely on it, since it's such a great broker.

RabbitMQ被许多大公司使用,人们真正依赖它,因为它是如此优秀的经纪人。

Here is a great tutorial on how to get you started with it.

这是一个很好的教程,介绍如何开始使用它。

#3


1  

I use beanstalkd ( http://kr.github.com/beanstalkd/ ) as the engine. Adding a worker and a task is pretty straightforward for Django if you use django-beanstalkd : https://github.com/jonasvp/django-beanstalkd/

我使用beanstalkd(http://kr.github.com/beanstalkd/)作为引擎。如果你使用django-beanstalkd,添加一个worker和一个任务对Django来说非常简单:https://github.com/jonasvp/django-beanstalkd/

It’s very reliable for my usage.

这对我的使用非常可靠。

Example of worker :

工人的例子:

import os
import time

from django_beanstalkd import beanstalk_job


@beanstalk_job
def background_counting(arg):
    """
    Do some incredibly useful counting to the value of arg
    """
    value = int(arg)
    pid = os.getpid()
    print "[%s] Counting from 1 to %d." % (pid, value)
    for i in range(1, value+1):
        print '[%s] %d' % (pid, i)
        time.sleep(1)

To launch a job/worker/task :

要启动工作/工作人员/任务:

from django_beanstalkd import BeanstalkClient
client = BeanstalkClient()

client.call('beanstalk_example.background_counting', '5')

(source extracted from example app of django-beanstalkd)

(从django-beanstalkd的示例应用程序中提取的源代码)

Enjoy !

请享用 !

#1


10  

I would also consider using something other than using the database as the "broker". It really isn't suitable for this kind of work.

我还会考虑使用除了使用数据库之外的其他东西作为“经纪人”。它真的不适合这种工作。

Though, you can move some of this overhead out of the request/response cycle by launching a task to create the other tasks:

但是,您可以通过启动任务来创建其他任务,从而将部分开销从请求/响应周期中移除:

from celery.task import TaskSet, task

from myapp.models import MyModel

@task
def process_object(pk):
    obj = MyModel.objects.get(pk)
    # do something with obj

@task
def process_lots_of_items(ids_to_process):
    return TaskSet(process_object.subtask((id, ))
                       for id in ids_to_process).apply_async()

Also, since you probably don't have 15000 processors to process all of these objects in parallel, you could split the objects in chunks of say 100's or 1000's:

此外,由于您可能没有15000个处理器来并行处理所有这些对象,因此您可以将对象拆分为100或1000的块:

from itertools import islice
from celery.task import TaskSet, task
from myapp.models import MyModel

def chunks(it, n):
    for first in it:
        yield [first] + list(islice(it, n - 1))

@task
def process_chunk(pks):
    objs = MyModel.objects.filter(pk__in=pks)
    for obj in objs:
        # do something with obj

@task
def process_lots_of_items(ids_to_process):
    return TaskSet(process_chunk.subtask((chunk, ))
                       for chunk in chunks(iter(ids_to_process),
                                           1000)).apply_async()

#2


2  

Try using RabbitMQ instead.

尝试使用RabbitMQ。

RabbitMQ is used in a lot of bigger companies and people really rely on it, since it's such a great broker.

RabbitMQ被许多大公司使用,人们真正依赖它,因为它是如此优秀的经纪人。

Here is a great tutorial on how to get you started with it.

这是一个很好的教程,介绍如何开始使用它。

#3


1  

I use beanstalkd ( http://kr.github.com/beanstalkd/ ) as the engine. Adding a worker and a task is pretty straightforward for Django if you use django-beanstalkd : https://github.com/jonasvp/django-beanstalkd/

我使用beanstalkd(http://kr.github.com/beanstalkd/)作为引擎。如果你使用django-beanstalkd,添加一个worker和一个任务对Django来说非常简单:https://github.com/jonasvp/django-beanstalkd/

It’s very reliable for my usage.

这对我的使用非常可靠。

Example of worker :

工人的例子:

import os
import time

from django_beanstalkd import beanstalk_job


@beanstalk_job
def background_counting(arg):
    """
    Do some incredibly useful counting to the value of arg
    """
    value = int(arg)
    pid = os.getpid()
    print "[%s] Counting from 1 to %d." % (pid, value)
    for i in range(1, value+1):
        print '[%s] %d' % (pid, i)
        time.sleep(1)

To launch a job/worker/task :

要启动工作/工作人员/任务:

from django_beanstalkd import BeanstalkClient
client = BeanstalkClient()

client.call('beanstalk_example.background_counting', '5')

(source extracted from example app of django-beanstalkd)

(从django-beanstalkd的示例应用程序中提取的源代码)

Enjoy !

请享用 !