我使用Stomp.py连接到一个标准的ACtiveMQ服务器。我在模拟接收机崩溃的情况,我希望能够重新启动它,并在导致它崩溃的消息后继续运行。
我创建了两个示例脚本。
我采取的步骤是运行测试。
对于步骤4中我想要的行为,我希望它从消息7开始处理。
然而我没有看到这一点。如果receever用ack='auto'来订阅,那么在步骤4中,它没有处理任何消息--所有的消息都从队列中消失了,我失去了50条消息!如果我使用ack='client'或者ack='client-individual',那么在步骤4中,它没有处理任何消息。
如果我使用 ack='client' 或 ack='client-individual',那么在第 4 步中,它又从头开始,然后在信息 6 上再次崩溃。
这似乎表明接收机并不是一次处理消息,而是一次处理每一条消息,并运行每一条。我不希望出现这种行为,因为我想扩展到运行5个接收机,而且我希望负载分散。目前,我启动的第一个收件人会接收所有的消息并开始处理它们,而2-4号收件人只是等待新的消息。我想让收信机一次只收一个消息。
谁能给点提示,告诉我怎么做错了。
import stomp
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
for x in range(0,5):
conn.send(body="OK-BEFORE-CRASH", destination=destination)
conn.send(body="CRASH", destination=destination)
for x in range(0,50):
conn.send(body="OK-AFTER-CRASH", destination=destination)
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
def __init__(self, processMessage):
self.processMessage = processMessage
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
self.processMessage(headers, message)
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")
我通过修改receive函数,使用ack='client-individual'并手动发送ack消息,获得了上述所需的行为。(见下面的新版本)
但我还是无法让收信人一次处理一条消息。这可以通过下面的步骤来演示。
一开始第二个readMessagesFromQueue2什么都不做,直到第一个实例崩溃,然后它开始接收消息。我希望两个receiver实例都能从一开始就读取消息。
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
conn = None
def __init__(self, processMessage, conn):
self.processMessage = processMessage
self.conn = conn
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
try:
self.processMessage(headers, message)
finally:
self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")
读了很多不同的文档,我找到了问题所在。
ActiveMQ有一个预取大小的选项--。https:/svn.apache.orgreposinfrawebsitesproductionactivemqcontent5.7.0what-is-the-prefetch-limit-for.html。
如果你有一些需要长时间处理的消息,你可以把它设置为1,这在其他情况下是不合适的。
我可以在stopm.py中用下面这行来实现:conn.subscribe(destination=destination, id=1, ack='client-individual', headers={'activemq.prefetchSize': 1})
所以使用手动或自动ACK既不是这里也不是那里。关键是把预取限制在1。