如何在 django 的 celery 共享任务函数中运行通道函数?

问题描述 投票:0回答:0

我在一个 django proejct 中工作,同时使用 celery、redis、rabbitmq、channels。我正在使用两个队列 Queue A 和 Queue B 进行任务分配,并且我正在使用 rabbitmq 和 redis。我正在使用芹菜进行后台任务。现在我想运行通道 group_send 函数,或者更简单地说,我想在 celery 的共享任务函数中使用通道函数,这样我就可以使用通道并从共享任务函数实时发送消息。

试了很多方法还是报错,每次报错主要指向事件循环关闭了

这是我运行任务函数的 views.py 。

def myView(request , *args , **kwargs) :
    try : 
        myTask.delay({})
    except Exception as e : 
        print(e)
    return render(request , "app/htmlFile.html" , context = {} )

这是我的 tasks.py,我在其中创建了任务,并在其中使用通道函数 (group_send) 向通道组发送消息。

@shared_task(bind = True , queue = "A")
def myTask(self , data): 
    return None

现在我的各种组合,我使用不同的方法使它起作用。

  1. 使用 rabbitmq 作为通道并使用 async_to_sync 从通道发送消息。

任务.py

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer


@shared_task(bind = True , queue = "A")
def myTask(self , data): 
    # few operations 
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        "MY_GROUP_NAME" , 
        { "type" : "sendMessage" , "data" : "data" } 
    ) 
    return None

和我的settings.py

CHANNEL_LAYERS = {
    "default" : {
        "BACKEND" : "channels_rabbitmq.core.RabbitmqChannelLayer" ,
        "CONFIG" : {
            "host" : "amqp://guest:guest@localhost:5672/" ,
        } 
    }
}

我收到这个错误:

 raise RuntimeError(\nRuntimeError: Refusing to initialize channel layer without a running 
event loop.\n\nIf you\'re writing a django-channels Worker, channels_rabbitmq does not 
support\nthis. Read the way to write workers in the carehare documentation.\n\nIf you\'re 
calling `async_to_sync()`, the call must be within code run by\n`sync_to_async()`. Django 
Channels guarantees this for Django views. Elsewhere,\nbeware: `asgiref` *does* let you to 
call `async_to_sync()` without\n`sync_to_async()`, but `channels_rabbitmq` *doesn\'t* -- 
hence this error.\n', 'args': '[{}]', 'kwargs': '{}', 'description': 'raised unexpected', 
'internal': False}

使用此配置我根本无法发送消息。终端崩溃并且根本无法运行。我没有在前端收到来自 websockets 的消息。 使用此配置时,创建任务时出现错误,运行任务时也会出现此错误。

  1. 现在当我将 redis 用于我的通道层并使用相同的配置发送消息时,我能够发送消息但错误仍然存在。错误说,事件循环已关闭。

设置.py

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
        },
    },
}

任务.py

@shared_task(bind = True , queue = "A")
def myTask(self , data): 
    channel_layer = get_channel_layer()
    try : 
        async_to_sync(channel_layer.group_send)(
            "MY_GROUP_NAME" , 
            { "type" : "sendMessage" , "data" : "data" } 
        ) 
    except Exception as e : 
        print(e)

错误:

  File "C:\Om\Project\BugMatricsEnv\Lib\site-packages\redis\asyncio\connection.py", line 729, in disconnect
    self._writer.close()  # type: ignore[union-attr]
    ^^^^^^^^^^^^^^^^^^^^
  File "C:\Python311\Lib\asyncio\streams.py", line 344, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python311\Lib\asyncio\selector_events.py", line 831, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Python311\Lib\asyncio\base_events.py", line 758, in call_soon
    self._check_closed()
  File "C:\Python311\Lib\asyncio\base_events.py", line 519, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

我还尝试了不同的方法,比如一起使用 sync_to_async 和 async_to_sync 来使事件循环工作。

但错误仍然存在。

任务.py

@shared_task(bind = True , queue = "A")
def myTask(self , data): 
    channel_layer = get_channel_layer()
    async_send = async_to_sync(channel_layer.group_send)
    wrapped_send = sync_to_async(async_send)
    async def send_message() : 
        await wrapped_send(
        "MY_GROUP_NAME" , 
        {
            "data" : "data" , 
            "type" : "sendMessage" , 
        }
    )
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_message())
    loop.close()

又报错了:

 raise RuntimeError(\'There is no current event loop in thread %r.\'\nRuntimeError: There is no current event loop in thread \'Dummy-2\'.\n', 'args': '[{}]', 'kwargs': '{}', 'description': 'raised unexpected', 'internal': False}

所以基本上所有的错误点都是没有事件循环可以附加任务来完成这项工作。

我还尝试使用共享任务中的另一个函数发送消息,但它是同一件事。

有什么方法可以实现这一点,我想让它在共享任务函数中工作。有什么办法可以通过频道从共享任务功能发送消息。

在这种情况下,创建另一个任务函数只是为了发送消息是不可行的。

任何帮助都会很棒。谢谢。

python django redis rabbitmq django-channels
© www.soinside.com 2019 - 2024. All rights reserved.