我有一个使用 python pica 的方法。这里消费者需要为每个队列创建一个通道。当我编写这段代码时出现错误。
def Process(channel, exc, que):
channel.exchange_declare(exchange=exc, exchange_type='direct', durable=True)
result = channel.queue_declare(durable=True, queue=que, auto_delete=False,exclusive=False )
def callback_rabbit(ch,method,properties,body):
print("Message received = ", body)
channel.queue_bind(exchange=exc, queue=que, routing_key=que)
channel.basic_consume(on_message_callback=callback_rabbit,queue=que, auto_ack=True)
channel.start_consuming()
def Start():
credentials = pika.PlainCredentials('guest','guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
items = {"exc":[{"exchangeName":"bam", "queueName":"bam_queue1"},{"exchangeName":"bam", "queueName":"bam_queue2"},{"exchangeName":"bam", "queueName":"bam_queue3"}]}
for item in items["exc"]:
channel = connection.channel()
t_msg = Thread(target=Process, args=(channel,item["exchangeName"],item["queueName"]))
t_msg.start()
Pika 不是线程安全的。您应该在
Process
方法中创建一个新的连接和通道。