我正在开发一个带有pika 0.10.0和python 2.7版本的RabbitMQ使用者。在我的消费者客户端中,我有一个根据输入消息运行一段时间的进程。它可以在3到40分钟之间变化。我不想禁用心跳。相反,我正在寻找一些回复机制,可以保持连接活着,直到delivery_tag被发回。那可能吗?
我得到的链接很少,所有人都建议禁用心跳作为解决方法。但我不想禁用它。
参考:
Socket Error: 104 consuming messages with task that take a long time #753
BlockingConnection gets closed unexpectedly #734
另外,如果需要任何额外信息,请告诉我。提前致谢。
唯一的解决方案是定期发送心跳帧。
当使用BlockingConnection
时,你必须经常调用process_data_events
函数(time_limit
为零)。使用SelectConnection
或其他异步适配器时,必须确保没有任何进程阻塞,以便可以发送帧。
如果您的任务长时间运行并且由于某种原因您无法轻松中断或拆分该过程,则可以在另一个线程/进程中运行该任务,并且仍然可以从主线程发送pika帧。请记住,您应该避免跨线程使用pika连接(目前pika不是线程安全的)。
您将不得不使用额外的线程来处理BlockingConnection。我用threading
模块编写了一个简单的Producer和Consumer。
基本的想法是
源代码是here
还有一些我没有涉及的对话,但我认为基本的想法可能会有所帮助。