Stomp.py一次不能处理一条消息。

问题描述 投票:0回答:1

我使用Stomp.py连接到一个标准的ACtiveMQ服务器。我在模拟接收机崩溃的情况,我希望能够重新启动它,并在导致它崩溃的消息后继续运行。

我创建了两个示例脚本。

  • putMessagesToQueue.py - 这将会把56条消息放到目的地
  • readMessagesFromQueue.py - 这将从目的地读取消息。如果读到第6条消息,它将引发一个异常。每条消息的处理时间为1秒。

我采取的步骤是运行测试。

  1. 我运行putMessagesToQueue.py。
  2. 我运行readMessagesFromQueue.py--它成功地处理了5条消息,在消息6中出现了异常。
  3. 我终止了readMessagesFromQueue.py (ctrl-c)
  4. 我再次运行readMessagesFromQueue.py。

对于步骤4中我想要的行为,我希望它从消息7开始处理。

然而我没有看到这一点。如果receever用ack='auto'来订阅,那么在步骤4中,它没有处理任何消息--所有的消息都从队列中消失了,我失去了50条消息!如果我使用ack='client'或者ack='client-individual',那么在步骤4中,它没有处理任何消息。

如果我使用 ack='client' 或 ack='client-individual',那么在第 4 步中,它又从头开始,然后在信息 6 上再次崩溃。

这似乎表明接收机并不是一次处理消息,而是一次处理每一条消息,并运行每一条。我不希望出现这种行为,因为我想扩展到运行5个接收机,而且我希望负载分散。目前,我启动的第一个收件人会接收所有的消息并开始处理它们,而2-4号收件人只是等待新的消息。我想让收信机一次只收一个消息。

谁能给点提示,告诉我怎么做错了。

来源:

putMessagesToQueue.py。

import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py。

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

更新001

我通过修改receive函数,使用ack='client-individual'并手动发送ack消息,获得了上述所需的行为。(见下面的新版本)

但我还是无法让收信人一次处理一条消息。这可以通过下面的步骤来演示。

  1. 我运行putMessagesToQueue.py。
  2. 我运行readMessagesFromQueue2.py - 它将开始处理
  3. 在一个新的终端中运行readMessagesFromQueue2.py。

一开始第二个readMessagesFromQueue2什么都不做,直到第一个实例崩溃,然后它开始接收消息。我希望两个receiver实例都能从一开始就读取消息。

readMessagesFromQueue2.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")
python activemq stomp stomp.py
1个回答
1
投票

读了很多不同的文档,我找到了问题所在。

ActiveMQ有一个预取大小的选项--。https:/svn.apache.orgreposinfrawebsitesproductionactivemqcontent5.7.0what-is-the-prefetch-limit-for.html。

如果你有一些需要长时间处理的消息,你可以把它设置为1,这在其他情况下是不合适的。

我可以在stopm.py中用下面这行来实现:conn.subscribe(destination=destination, id=1, ack='client-individual', headers={'activemq.prefetchSize': 1})

所以使用手动或自动ACK既不是这里也不是那里。关键是把预取限制在1。

© www.soinside.com 2019 - 2024. All rights reserved.