如何在python中使用RabbitMQ进行简单的单元测试?

问题描述 投票:0回答:1

在我的单元测试中,我想简单地开始消耗、发布消息,并接收到一个响应返回,并断言响应是否是我所期望的。然而,我已经尝试做了几个小时,但没有找到解决方案。

问题是我不能在一个类中定义一个方法来停止消耗。我试着定义了一个这样的方法。

def stop(self):
    self.channel.basic_cancel()
def stop(self):
    self.channel.stop_consuming()
def stop(self):
    self.connection.close()

但似乎没有任何效果 我读到,这是因为一旦你执行了 start_consuming(),唯一的办法就是在发送消息后取消消耗。但如果我这样做,那么我就会修改原来的 on_request 而这对我的应用没有用,因为连接会在第一条消息后关闭。我发现 pytest-rabbitmq 但文档对我来说并不清楚,因此不知道是否可以使用这个插件来实现我想要的东西。

顺便说一下,这两者之间的区别是什么?basic_cancel, stop_consumingclose?

rabbitmq pika
1个回答
0
投票

我对你的方案没有一个清晰的认识!。以我的理解,你可以在同一个方法中创建连接和通道,这样你就可以在需要的时候发布、消费、断言和停止消费了。

希望能帮到你

def test_rabbitmq():
    from pika import BlockingConnection, ConnectionParameters, PlainCredentials

    conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
    channel = conn.channel()

    # define your consumer
    def on_message(channel, method_frame, header_frame, body):
        message = body.decode()
        # assert your message here
        # asset message == 'value'
        channel.basic_cancel('test-consumer')  # stops the consumer

    # define your publisher
    def publish_message(message):
        channel.basic_publish(exchange='', routing_key='', body=message')

    publish('your message')
    tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming - 取消所有消费者,标志着start_consuming循环的退出。

basic_cancel - 本方法取消一个消费者。一个消费者标签将被作为输入。

close - 关闭连接通道。

参考资料

© www.soinside.com 2019 - 2024. All rights reserved.