使用带有SQS的Celery时,Errno 111连接被拒绝

时间:2021-12-15 13:00:53

I am having a problem when I run the celery status or celery purge commands.

当我运行芹菜状态或芹菜清除命令时,我遇到了问题。

 File "/usr/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/lib/python2.7/site-packages/celery/__main__.py", line 30, in main
    main()
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 306, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 761, in handle_argv
    return self.execute(command, argv)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 693, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 310, in run_from_argv
    sys.argv if argv is None else argv, command)
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 372, in handle_argv
    return self(*args, **options)
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 269, in __call__
    ret = self.run(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 472, in run
    replies = I.run('ping', **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 324, in run
    return self.do_call_method(args, **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 346, in do_call_method
    callback=self.say_remote_command_reply)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 385, in call
    return getattr(i, method)(*args)
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 99, in ping
    return self._request('ping')
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 70, in _request
    timeout=self.timeout, reply=True,
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 306, in broadcast
    limit, callback, channel=channel,
  File "/usr/lib/python2.7/site-packages/kombu/pidbox.py", line 283, in _broadcast
    chan = channel or self.connection.default_channel
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 755, in default_channel
    self.connection
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 740, in connection
    self._connection = self._establish_connection()
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 695, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
    conn = self.Connection(**opts)
  File "/usr/lib/python2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 294, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: [Errno 111] Connection refused

I am using SQS BROKER_URL. The tasks are running fine, but when I want to purge the tasks on a queue (celery purge -f), I get the above error.

我正在使用SQS BROKER_URL。任务运行正常,但是当我想清除队列上的任务(celery purge -f)时,我得到了上述错误。

software -> celery:3.1.11 (Cipater) kombu:3.0.18 py:2.7.5
            billiard:3.3.0.17 py-amqp:1.4.5
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

My server has ports 22, 80, 443, 8000 ports open and there are tons of messages in the SQS celery queue, so the connection between celery and SQS should be fine.

我的服务器端口22,80,443,8000端口打开,SQS芹菜队列中有大量消息,因此芹菜和SQS之间的连接应该没问题。

1 个解决方案

#1


10  

Based on documentation of the commands status and purge you need to provide celery with the celery app you're referring to so that it knows what broker to use. By just typing $celery purge or by typing $celery status celery doesn't know what celery app you're targeting and so fails.

根据命令状态和清除的文档,您需要提供celery与您所指的芹菜应用程序,以便它知道要使用的代理。只需键入$ celery purge或键入$ celery status celery就不知道你所针对的是什么芹菜应用程序,因此失败了。

Therefore, go to your celery app

因此,去你的芹菜应用程序

$cd /path/to/your/celery/app/directory

$ cd / path / to / your / celery / app /目录

and then call celery purge on your app. In this example my directory has celeryapp.py and the contents are:

然后在您的应用程序上调用celery purge。在这个例子中,我的目录有celeryapp.py,内容是:

from config import config
from celery import Celery
celery_app = Celery('tasks', 
                    backend=config.celery_backend_uri, 
                    broker=config.celery_broker_uri)
celery_app.conf.update(
    CELERY_IMPORTS=(
        'app.module_a.tasks',   # we're not including our tasks here as
        'app.module_b.tasks',   # our tasks are in other files listed here
    )
)

the contents are not as important as our call, but they are provided to show that we have our celery app inside celeryapp.py so I can call

内容并不像我们的电话那么重要,但提供它们是为了表明我们在celeryapp.py中有我们的芹菜应用程序,所以我可以打电话

$celery -A celeryapp status
worker-name-a@node-name: OK
worker-name-b@node-name: OK

or

要么

$celery -A celeryapp purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation! 
(to skip this prompt use the -f option)
Are you sure you want to delete all tasks (yes/NO)? yes
No messages purged from 1 queue

I had a similar question here and Sol seemed to confirm that celery will output this error if no app is provided by stating

我在这里有一个类似的问题,Sol似乎确认芹菜将输出此错误,如果没有通过说明提供应用程序

How would it know what broker transport to use if you don't give it the location of the app?

如果你不给它应用程序的位置,它将如何知道要使用的代理运输?

#1


10  

Based on documentation of the commands status and purge you need to provide celery with the celery app you're referring to so that it knows what broker to use. By just typing $celery purge or by typing $celery status celery doesn't know what celery app you're targeting and so fails.

根据命令状态和清除的文档,您需要提供celery与您所指的芹菜应用程序,以便它知道要使用的代理。只需键入$ celery purge或键入$ celery status celery就不知道你所针对的是什么芹菜应用程序,因此失败了。

Therefore, go to your celery app

因此,去你的芹菜应用程序

$cd /path/to/your/celery/app/directory

$ cd / path / to / your / celery / app /目录

and then call celery purge on your app. In this example my directory has celeryapp.py and the contents are:

然后在您的应用程序上调用celery purge。在这个例子中,我的目录有celeryapp.py,内容是:

from config import config
from celery import Celery
celery_app = Celery('tasks', 
                    backend=config.celery_backend_uri, 
                    broker=config.celery_broker_uri)
celery_app.conf.update(
    CELERY_IMPORTS=(
        'app.module_a.tasks',   # we're not including our tasks here as
        'app.module_b.tasks',   # our tasks are in other files listed here
    )
)

the contents are not as important as our call, but they are provided to show that we have our celery app inside celeryapp.py so I can call

内容并不像我们的电话那么重要,但提供它们是为了表明我们在celeryapp.py中有我们的芹菜应用程序,所以我可以打电话

$celery -A celeryapp status
worker-name-a@node-name: OK
worker-name-b@node-name: OK

or

要么

$celery -A celeryapp purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation! 
(to skip this prompt use the -f option)
Are you sure you want to delete all tasks (yes/NO)? yes
No messages purged from 1 queue

I had a similar question here and Sol seemed to confirm that celery will output this error if no app is provided by stating

我在这里有一个类似的问题,Sol似乎确认芹菜将输出此错误,如果没有通过说明提供应用程序

How would it know what broker transport to use if you don't give it the location of the app?

如果你不给它应用程序的位置,它将如何知道要使用的代理运输?