Python - RabbitMQ Pika 消费者 - 如何使用异步函数作为回调

问题描述 投票:0回答:2

我有以下代码,我在其中初始化监听队列的消费者。

consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
    exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)

问题是consum方法定义如下:

async def consume(self, channel, method, properties, body):

在consume方法中,我们需要await异步函数,但这会为consume函数产生错误“coroutine is not waiting”。有没有办法在 pika 中使用异步函数作为回调?

python rabbitmq pika
2个回答
4
投票

我用

@sync
注释了我的回调,其中同步是:

def sync(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
    return wrapper

(在here找到了芹菜,但它也适用于鼠兔)


0
投票

我也有类似的疑问,我最终使用了AsyncioConnection适配器

class Consumer:
  
  def __init__(self, loop, ...):
    self._loop = loop
    self._in_flight_tasks = set()

  def connect(self):
    return AsyncioConnection(
      parameters=...
      custom_ioloop=self._loop,
    )

  ...

  
  async def _handle_message(...):
    ...
  
  def on_message(self, _unused_channel, basic_deliver, properties, body):
      task = self._loop.create_task(self._handle_message(tag, properties, body))
      self._in_flight_tasks.add(task)
      task.add_done_callback(self._in_flight_tasks.discard)

  ...

请注意,我将事件循环传递给消费者。我使用应用程序顶部的

asyncio.new_event_loop()
创建它。我不确定这是否是必需的,但可能是因为 Pika 我们默认使用一些自定义事件循环实现。

大部分消费者代码取自 Pika examples

有关为何将任务添加到集合中然后又被丢弃的解释,请参阅此处

© www.soinside.com 2019 - 2024. All rights reserved.