使用鼠兔在RabbitMQ中同步和阻塞消耗

问题描述 投票:9回答:1

我想与阻塞同步使用队列(RabbitMQ)。

注意:下面是准备好运行的完整代码。

系统设置使用RabbitMQ作为其排队系统,但我们的一个模块不需要异步消耗。

我尝试在BlockingConnection上使用basic_get,它不会阻塞(立即返回(None, None, None)):

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

我也试过使用consume generator,经过很长一段时间没用就失败了“连接已关闭”。

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)

有没有办法使用pika客户端使用RabbitMQ,因为我会在python中使用Queue.Queue?或类似的东西?

我现在的选择是忙等待(使用basic_get) - 但我宁愿使用现有系统来忙 - 等待,如果可能的话。

完整代码:

#!/usr/bin/env python
import pika
import time

TEST_QUEUE = 'test'
def get_connection():
        # define connection
        connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                        host=YOUR_IP,
                        port=YOUR_PORT,
                        credentials=pika.PlainCredentials(
                                username=YOUR_USER,
                                password=YOUR_PASSWORD,
                        )
                )
        )
        return connection

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)


print "blocking_get_1"
blocking_get_1()

print "blocking_get_2"
blocking_get_2()

get_connection().channel().queue_delete(TEST_QUEUE)
python python-2.7 rabbitmq pika
1个回答
12
投票

Pika的一个常见问题是它目前没有在后台处理传入的事件。这基本上意味着在许多情况下,您需要定期调用connection.process_data_events()以确保它不会错过心跳。

这也意味着如果你长时间睡眠,pika将不会处理传入的数据,并最终死亡,因为它没有响应心跳。这里的一个选项是禁用心跳。

我通常通过在后台检查新事件来解决这个问题,如this示例中所示。

如果你想完全阻止我会做这样的事情(基于我自己的库AMQP-Storm)。

while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if result:
        print("Message:", result['body'])
        channel.basic.ack(result['method']['delivery_tag'])
    else:
        print("Channel Empty.")
        sleep(1)

这是基于找到的here示例。

© www.soinside.com 2019 - 2024. All rights reserved.