我有一个导入以下模块的类:
import pika
import pickle
from apscheduler.schedulers.background import BackgroundScheduler
import time
import logging
class RabbitMQ():
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
self.channel = self.connection.channel()
self.sched = BackgroundScheduler()
self.sched.add_job(self.keep_connection_alive, id='clean_old_data', trigger='cron', hour = '*', minute='*', second='*/50')
self.sched.start()
def publish_message(self, message , path="path"):
message["path"] = path
logging.info(message)
message = pickle.dumps(message)
self.channel.basic_publish(exchange="", routing_key="server", body=message)
def keep_connection_alive(self):
self.connection.process_data_events()
rabbitMQ = RabbitMQ()
def publish_message(message , path="path"):
rabbitMQ.publish_message(message, path=path)
我的class.py:
import RabbitMQ as rq
class MyClass():
...
在为MyClass生成单元测试时,我无法模拟这部分代码的连接。并保持抛出异常。它根本不起作用
pika.exceptions.ConnectionClosed: Connection to 127.0.0.1:5672 failed: [Errno 111] Connection refused
我尝试了几种方法来模拟这种连接,但这些方法似乎都不起作用。我想知道我能做些什么来支持这种测试?模拟整个RabbitMQ模块?或者也许只是模拟连接
就像上面提到的评论者一样,问题是你的全球创造你的RabbitMQ
。
我的下意识反应是说“只是摆脱它,你的模块级publish_message
”。如果你能做到这一点,那就去寻找解决方案吧。你在publish_message
上有一个RabbitMQ
接受相同的args;然后,任何调用者都应该创建一个RabbitMQ
类的实例。
如果你因为某种原因不想或不想这样做,你应该只是移动你的模块级publish_message
中移动该对象实例化的实例化,如下所示:
def publish_message(message , path="path"):
rabbitMQ = RabbitMQ()
rabbitMQ.publish_message(message, path=path)
这会在每次调用时创建一个新连接。也许那没关系...但也许不是。因此,为了避免创建重复的连接,您需要引入类似单例模式的内容:
class RabbitMQ():
__instance = None
...
@classmethod
def get_instance(cls):
if cls.__instance is None:
cls.__instance = RabbitMQ()
return cls.__instance
def publish_message(message , path="path"):
RabbitMQ.get_instance().publish_message(message, path=path)
理想情况下,您需要完全避免使用单例模式。无论调用者应该存储RabbitMQ
对象的单个实例并直接调用publish_message
。
所以TLDR /理想的解决方案IMO:摆脱最后3行。调用者应该创建一个RabbitMQ
对象。
编辑:哦,以及它发生的原因 - 导入该模块时,正在评估:rabbitMQ = RabbitMQ()
。在评估完成后,您尝试模拟它,并且无法连接。