来自消费者的Pika RabbitMQ发布

问题描述 投票:0回答:1

我有一个RabbitMQ消费者。我希望该使用者进行一些消息处理,并由time.sleep(10)模拟,然后将消息发布到其他队列中。我知道使用者回调具有一个理论上可以用于发布的通道,但是这似乎是一个不好的实现,因为如果basic_publish()以某种方式设法强制关闭通道,那么使用者就会死亡。处理此问题的最佳方法是什么?

import time
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='original_queue', exclusive=True)

channel.queue_bind(exchange='logs', queue='original_queue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    time.sleep(10)
    ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')

channel.basic_consume(
    queue='original_queue', on_message_callback=callback, auto_ack=True)

channel.start_consuming()
python python-3.x rabbitmq pika python-pika
1个回答
0
投票

您可以以某种方式实施您的使用者,如果连接断开,它会自动重新连接到RabbitMQ服务器。希望这会有所帮助(我在设计部分没有过多考虑,请随时提出一些建议!)

import time
import pika

reconnect_on_failure = True


def consumer(connection, channel):

    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    result = channel.queue_declare(queue='original_queue', exclusive=True)
    channel.queue_bind(exchange='logs', queue='original_queue')
    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        time.sleep(10)
        ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')

    channel.basic_consume(
  queue='original_queue', on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


def get_connection_and_channel():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()


def start(reconnect_on_failure):
    connection, channel = get_connection_and_channel()
    consumer(connection, channel)
    # the if condition will be executed when the consumer's start_consuming loop exists
    if reconnect_on_failure:
        # cleanly close the connection and channel
        if not connection.is_closed():
            connection.close()
        if not channel.is_close():
            channel.close()
        start(reconnect_on_failure)


start(reconnect_on_failure)
© www.soinside.com 2019 - 2024. All rights reserved.