Celery 未接收在后台运行的任务

问题描述 投票:0回答:2

我在设置 celery 与我的 Flask 应用程序一起使用时遇到问题。我使用了准系统应用程序来测试配置,并发现我的 celery 工作程序已启动,但没有像所有教程中那样执行任何任务。基本上,当你调用 .delay() 函数时,它应该获取你的 python 函数并将其发送到 celery 在后台处理,但由于无法建立连接,事情挂起了。因此,可能我的配置不正确,或者我下载的软件版本之一存在我不知道的错误。

这是我的requirements.txt 文件的内容:

amqp==5.1.0
anyjson==0.3.3
async-timeout==4.0.2
beautifulsoup4==4.10.0
billiard==3.6.4.0
celery==5.2.3
cffi==1.15.0
click==8.0.4
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
colorama==0.4.4
Deprecated==1.2.13
Flask==2.0.3
Flask-SQLAlchemy==2.5.1
greenlet==1.1.2
itsdangerous==2.1.2
Jinja2==3.1.1
kombu==5.2.4
MarkupSafe==2.1.1
packaging==21.3
prompt-toolkit==3.0.28
pycparser==2.21
pyparsing==3.0.7
pytz==2022.1
redis==4.2.0
six==1.16.0
soupsieve==2.3.1
SQLAlchemy==1.4.32
typing_extensions==4.1.1
vine==5.0.0
wcwidth==0.2.5
Werkzeug==2.0.3
wrapt==1.14.0
yahoofinancials==1.6

这是tasks.py。请注意注释掉的行,因为由于某种原因,如果没有指定后端,芹菜工作人员就无法正确启动,这也很奇怪。

from celery import Celery
from time import sleep

#app = Celery('tasks', broker='redis://localhost:6379')
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def reverse(myString):
    sleep(5)
    return myString[::-1]

celery 应用程序在虚拟环境中正常启动:

C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\projectFiles>..\Scripts\activate

(testApp) C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\projectFiles>celery -A tasks worker --loglevel=INFO

 -------------- celery@DESKTOP-GHMPTB0 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2022-03-31 12:07:03
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x24f8cfca1a0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add
  . tasks.reverse

[2022-03-31 12:07:03,550: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-03-31 12:07:03,565: INFO/MainProcess] mingle: searching for neighbors
[2022-03-31 12:07:04,128: INFO/SpawnPoolWorker-1] child process 240 calling self.run()
[2022-03-31 12:07:04,128: INFO/SpawnPoolWorker-4] child process 13564 calling self.run()
[2022-03-31 12:07:04,128: INFO/SpawnPoolWorker-3] child process 8584 calling self.run()
[2022-03-31 12:07:04,128: INFO/SpawnPoolWorker-2] child process 8344 calling self.run()
[2022-03-31 12:07:04,611: INFO/MainProcess] mingle: all alone
[2022-03-31 12:07:04,642: INFO/MainProcess] celery@DESKTOP-GHMPTB0 ready.

然后将函数调用发送到 celery 的结果给我一个连接错误。这是让我难受的部分。

(testApp) C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\projectFiles>python
Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import *
>>> result = add.delay(2,3)
Traceback (most recent call last):
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 614, in connect
    sock = self.retry.call_with_retry(
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\retry.py", line 45, in call_with_retry
    return do()
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 615, in <lambda>
    lambda: self._connect(), lambda error: self.disconnect(error)
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 680, in _connect
    raise err
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 668, in _connect
    sock.connect(socket_address)
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\celery\backends\redis.py", line 119, in reconnect_on_error
    yield
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\celery\backends\redis.py", line 169, in _consume_from
    self._pubsub.subscribe(key)
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\client.py", line 1549, in subscribe
    ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\client.py", line 1390, in execute_command
    self.connection = self.connection_pool.get_connection(
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 1386, in get_connection
    connection.connect()
  File "C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\lib\site-packages\redis\connection.py", line 620, in connect
    raise ConnectionError(self._error_message(e))
redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379. No connection could be made because the target machine actively refused it.

为了确认,我正在运行 python 版本 3.10.4,这是 celery 可接受的版本。

(testApp) C:\Users\Owner\My Drive\Documents\Scripts\virtual_envs\testApp\projectFiles>python --version
Python 3.10.4

有人看出哪里出了问题吗?如果我无法让后台任务正常工作,我就无法真正推进我的实际项目。我是芹菜新手,正在尝试解决它,但如果我无法完成这项工作,我愿意更换经纪人或调度软件。

python redis celery amqp
2个回答
0
投票

“连接被拒绝”,这意味着没有任何东西监听您尝试连接的 IP:端口。你应该检查你的 Redis 实例。

调试提示:可以去掉后端参数,重新测试一下,如果一切正常,就确定Redis连接有问题。


0
投票

尝试运行:

celery -A 任务工作者 --loglevel=INFO -P 独奏

或安装eventlet

pip 安装 eventlet

结束,尝试运行:

celery -A 任务工作者 --loglevel=INFO -P eventlet

© www.soinside.com 2019 - 2024. All rights reserved.