如何使用Python Stomp连接捕获心跳超时

问题描述 投票:0回答:1

我经常遇到这个心跳超时问题,然后接收器处于空闲状态。我的代码的骨架是这样的:

def connect_and_subscribe(conn):
    the_id = 1111
    user = 'my_user'
    password = 'my_password'
    destination = 'my_destination'
    subscription_name = 'my_subscriber'

    conn.connect(login=user,passcode=password, wait=True, wait_time=120, headers = {'client-id': 'xxxx'})
    # activemq.subscriptionName ensures durable connection
    conn.subscribe(destination=destination, id=the_id, ack='auto', 
                   persistent=True, 
                   headers = {"activemq.subscriptionName":subscription_name, "activemq.persistent":"true"})

class MyListener(stomp.ConnectionListener):
  
  def __init__(self, conn, queue):
    self.conn = conn
    self.count = 0
    self.errors = 0
    self.stop = False
    self.queue = queue
  
  def on_error(self, message):
    print('Received an error %s' % message)
    self.stop = True
    
  def on_message(self, message):
    if message == "SHUTDOWN":
        diff = time.time() - self.start
        print("Received %s in %f seconds" % (self.count, diff))
        print("Receiver shutdown")
        self.stop = True
      
    else:
        item = (self.count, message)
        self.queue.put(item)        
        self.count += 1
        
  def on_disconnect(self):
      print_log('Disconnected, going to restart ...')
      connect_and_subscribe(self.conn)
      self.stop = False

# producer task
def producer(queue):
    host = "my_host"
    port = my_port
    destination = "my_destination"
    heartbeats = 4000
    subscription_name = "my_subscriber"
    
    conn = stomp.Connection12(host_and_ports = [(host, port)], timeout=120, heartbeats=(heartbeats, heartbeats))
    listener = MyListener(conn, queue)
    conn.set_listener('', listener)

    print('Listen to ' + host + ':' + str(port) + ' at destination:' + destination)
    print ('Subscription name:' + subscription_name + ' with heartbeats:' + str(heartbeats))
    connect_and_subscribe(conn)
    
    while not listener.stop:
        time.sleep(30)
        if check_stop(): # a signal will be given if I want to stop the process
            print('Listener is stopped')
            listener.stop = True
                
    print('Producer is stopped')    
    print('Producer has received ' + str(listener.count) + ' messages in total')
    queue.put(None)
    conn.disconnect()
    print('Producer: Done')

在很长一段时间没有从目的地发送消息后,我收到此错误:

heartbeat timeout: diff_receive=6.01600000000326, time=372544.906, lastrec=372538.89

那么如何防止这种超时呢?我应该只是增加心跳吗?

其次,如何捕获这个心跳超时呢?然后我可以重新连接并重新订阅。

三、为什么超时后

on_disconnect
功能没有触发?

python timeout stomp heartbeat stomp.py
1个回答
0
投票

我让Producer经常检查连接情况。如果连接丢失,则重新连接。这解决了我的问题。

while not listener.stop:
        time.sleep(30)
        if check_stop(): # a signal will be given if I want to stop the process
            print('Listener is stopped')
            listener.stop = True
        else:
            if not conn.is_connected():
                print('Producer is disconnected and found by is_connected')
                connect_and_subscribe(conn)
                print('Producer reconnects')
© www.soinside.com 2019 - 2024. All rights reserved.