为什么消息再次出现在onMessage()函数中?

问题描述 投票:9回答:4

我正在使用ActiveMQ发送消息。

所以当我发送消息时,消息就会收到消息。成功插入后,即可确认。

但我确认后会有代码,可以抛出NullPointerException

所以为了故意产生这个例外,我抛出了NullPointerException。所以当它这样做时:

消息不是dequeued,同样的消息再次出现在onMessage函数中。

我的代码是:

public void onMessage(Message message) {
    String msg = null;
    try
    {
        msg = receiveMessage(message);

        // Other code to insert message in db

        message.acknowledge();

        if(true)
        {
            throw new NullPointerException("npe"));
        }
            ** // Other code which might produce a null pointer exception **
        }
        catch(Exception ex)
        {
        }
    }

为什么消息再次来到onMessage()功能,因为我也有acknowledge()

因为我已经在db中插入了消息。

队列中的消息是否会在acknowledge()上删除?

我怎么能做到这一点?

java nullpointerexception jms listener activemq
4个回答
2
投票

您可以创建一个单独的方法来处理消息,我的意思是在onMessage()函数中编写代码,只将该消息插入数据库。

并创建一个单独的函数来处理该消息。

因此,如果您在处理过程中遇到任何错误,该消息将不会再次发送到onMessage()


4
投票

您可以将AUTO确认模式与消息列表器一起使用,然后通过规范,如果消息侦听器无法成功返回(例如,如果抛出异常),则会重新传递消息。

在您的情况下,您尝试手动确认消息,但使用createSession(false, Session.AUTO_ACKNOWLEDGE)创建的会话无法做到这一点。

您的代码可以与Session.CLIENT_ACKNOWLEDGE一起使用。

否则,您希望在使用AUTO_ACKNOWLEDGE时捕获onMessage方法中的异常。

要对邮件进行更细粒度的控制,请考虑使用事务处理会话并使用session.commit();确认已读取邮件。


3
投票

你有没有检查过你没有使用transacted sessions?使用事务处理会话时,将忽略确认模式,因此:

  • 你的message.acknowledge()实际上是一个无操作
  • 转义消息监听器时,未捕获的异常将触发“会话回滚”,从而强制重新传递消息。

注意:您发布的代码有一个catch (Exception ex) { },所以我不确切知道您的异常是如何在外面逃脱的。


0
投票

当您使用事务处理的JMS确认模式时,JMS-listener将多次收到您的消息(默认情况下,它在AMQ中大约为8),直到被处理无异常或将被JMS-container移动到DQL队列。有关详细信息,请参阅Message Redelivery and DLQ Handling

管理事务取决于您使用的框架。我更喜欢使用Spring Framework,所以我的Spring XML配置如下所示:

<jms:listener-container container-type="default"
                        connection-factory="calendarConnectionFactory"
                        acknowledge="transacted"
                        destination-type="queue"
                        cache="consumer"
                        concurrency="1-5">
    <jms:listener destination="${jms.calendar.destination}" ref="calendarListener"/>
</jms:listener-container>

我的消息监听器的Java代码是

@Override
@Transactional(propagation = Propagation.REQUIRED,
               noRollbackFor =
                 {ClassCastException.class, IllegalArgumentException.class})
public void onMessage(Message message) {
 ....
}

所以我可以管理哪些异常会回滚事务。

© www.soinside.com 2019 - 2024. All rights reserved.