我正在尝试将JSON发送到Rabbitmq。这是我的制作人,它可以正常工作:
import pika
import json
credentials = pika.PlainCredentials('admin', '123')
parameters = pika.ConnectionParameters('192.168.1.11',
5672,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='myapp')
message = {'fname': 'test', 'lname': 'test'}
channel.basic_publish(exchange='',
routing_key='myapp',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode = 2,
))
print(" [x] Sent %r" % message)
connection.close()
这是接收器,它不正常:
import pika
import time
import json
credentials = pika.PlainCredentials('admin', '123')
parameters = pika.ConnectionParameters('192.168.1.12',
5672,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='myapp')
def callback(ch, method, properties, body):
myjson = json.loads(body)
global fname
global lname
fname = myjson["fname"]
lname = myjson["lname"]
my_func(fname,lname)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='myapp',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
def my_func(fname,lname):
pass
当我运行脚本时接收器显示以下问题:
NameError:全局名称“ my_func”未定义
问题是由于您的接收器中存在此功能。py
channel.start_using()
该功能将在此处阻止程序,并阻止其查看该功能。另一方面,代码分析工具,例如pylint,很可能无法检测到此类错误。
一个简单的解决方案是将代码更改为
def callback(ch, method, properties, body):
myjson = json.loads(body)
global fname
global lname
fname = myjson["fname"]
lname = myjson["lname"]
my_func(fname,lname)
def my_func(fname,lname):
pass
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='myapp',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
更复杂的解决方案是创建您自己的类或方法,并以异步方式运行start_consumption()。这是我的示例:
def listen(self, exchange, routing, callback):
self.ReceiveChannel = self.connection.channel()
consumer_thread = threading.Thread(target=self.__consumer__, args=[exchange, routing, callback])
consumer_thread.start()