Celery 在 docker 中运行时无法将任务发送到队列

问题描述 投票:0回答:2

我在Windows上测试了它并且它有效,但现在我想使用docker来完成它。问题是,当我尝试执行任务向用户发送电子邮件时,我收到错误:

[Errno 111] Connection refused
,但 celery 成功启动并连接到rabbitmq。为什么celery无法向rabbitmq发送任务?

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
           ^^^^^^^^^^^^^^

During handling of the above exception ('ChannelPromise' object has no attribute '__value__'), another exception occurred:
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
    yield
    ^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 318, in retry_over_time
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 941, in _connection_factory
    self._connection = self._establish_connection()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 867, in _establish_connection
    conn = self.transport.establish_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
    conn.connect()
    ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 129, in connect
    self._connect(self.host, self.port, self.connect_timeout)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 184, in _connect
    self.sock.connect(sa)
    ^^^^^^^^^^^^^^^^^^^^^

The above exception ([Errno 111] Connection refused) was the direct cause of the following exception:
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/website/journal_website/views.py", line 281, in register_new_user
    send_email_message_to_user_with_activation_link.delay(new_user.pk, new_user_additional_data.code)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
           
  File "/usr/local/lib/python3.11/dist-packages/celery/app/base.py", line 798, in send_task
    amqp.send_task_message(P, name, message, **options)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/amqp.py", line 517, in send_task_message
    ret = producer.publish(
          
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 186, in publish
    return _publish(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 563, in _ensured
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 195, in _publish
    channel = self.channel
              ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 218, in _get_channel
    channel = self._channel = channel()
                              ^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
                             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 234, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 960, in default_channel
    self._ensure_connection(**conn_opts)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 458, in _ensure_connection
    with ctx():
    ^^^^^
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

docker-compose.yml:

version: "3.0"

services:
  # WEB
  django:
    build: .
    command: python3.11 manage.py runserver 0.0.0.0:8000
    container_name: django-server
    volumes:
      - media_volume:/website/journal_website/media
      - static_volume:/website/journal_website/static
      - database_volume:/website/journal_website/database
    ports:
      - "8000:8000"
    depends_on:
      - rabbit

  # Celery
  celery:
    build: .
    command: celery -A website worker -l info
    container_name: celery
    depends_on:
      - rabbit
  
  # RabbitMQ
  rabbit:
    hostname: rabbit
    container_name: rabbitmq
    image: rabbitmq:3.12-rc-management
    ports:
      # AMQP protocol port
      - "5672:5672"
      # HTTP management UI
      - "15672:15672"
    restart: always
      

volumes:
  media_volume:
  static_volume:
  database_volume:

celery.py:

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "website.settings")
celery_application = Celery("website")
celery_application.config_from_object("django.conf:settings", namespace="CELERY")
celery_application.conf.broker_url = "amqp://rabbit:5672"
celery_application.autodiscover_tasks()

任务.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task

# Some imports...

@shared_task
def send_email_message_to_user_with_activation_link(target_user_id: int, code: UUID) -> HttpResponse | None:
    target_user = User.objects.get(pk=target_user_id)
    content = {
        "email": target_user.email,
        "domain": "127.0.0.1:8000",
        "site_name": "Website",
        "user": target_user,
        "protocol": "http",
        "code": code,
    }

    message = render_to_string("user/account_activation/account_activation_email.txt", content)
    try:
        send_mail("Account activation", message, "[email protected]" , [target_user.email], fail_silently=False)
    except BadHeaderError:
        return HttpResponse("Invalid header found.")
python django docker rabbitmq celery
2个回答
0
投票

我尝试了你的 docker-compose,它对我来说效果很好。唯一的问题是rabbitmq需要大约20秒的时间来启动和接受连接。在连接启动之前,您可能会看到连接超时错误。但一旦启动,芹菜工人就会建立连接并正常工作。

对于开发来说,进行更改时最好不要停止rabbitmq容器并重新启动celery容器。但是你可能总是会在rabbitmq启动时遇到超时。


0
投票

已解决,它可以在带有 WSL 2 的 Linux 上运行。首先,我需要在项目目录中的

__init__.py
文件中添加一些代码:

from .celery import celery_application as celery_app
__all__ = ["celery_app"]

然后我在

docker-compose
文件中为 django 和 celery 容器使用了通用数据库卷:

  # Celery worker
  celery_worker:
    command: celery -A website worker -l info
    container_name: celery_worker
    volumes:
      - database_volume:/website/journal_website/database # just add this to celery worker service.
    image: django-image # you need to use common image for django and celery containers that is created from Dockerfile.
    depends_on:
      - rabbitmq
      - django
© www.soinside.com 2019 - 2024. All rights reserved.