如何检查主题队列是否为空,然后终止订户?

问题描述 投票:0回答:1

在我的业务应用程序中,我必须定期对某个主题中的所有消息进行批处理,因为它比以先到先得的方式处理它们便宜。我正在计划执行的当前方法是有一个cronjob,每T小时运行一次订阅者。我当前正在解决的问题是一旦处理完所有消息后如何终止订户。我想每cronjob小时启动一次T,让订阅者使用主题队列中的所有消息并终止。据我了解,没有pub-sub Java API可以告诉我主题队列是否为空。我提出了以下两种解决方案:

  1. 创建一个异步拉取的订户。休眠t minutes时消耗所有消息,然后使用subscriber.stopAsync().awaitTerminated();终止它。在这种方法中,有可能我在终止订户之前可能不会消耗掉所有消息。 Google示例here

  2. 使用Pub/Sub Cloud monitoringfind the value of the metric subscription/num_undelivered_messages。然后使用Google here提供的同步提取示例提取大量消息。然后终止订户。

有更好的方法吗?

谢谢!

google-cloud-platform cloud publish-subscribe google-cloud-pubsub
1个回答
0
投票

一个月前,我在Go中完成了相同的实现。我的假设如下:

  • 如果队列中有消息,则应用程序会很快消耗它们(两次消息之间的间隔少于100毫秒)。
  • 如果队列为空(我的应用已完成使用所有消息的时间,则新消息可能会出现,但会比100ms慢)>
  • 因此,我实现了这一点:*每当我收到一条消息时,*我暂停100ms超时*我处理并确认消息*我将100ms超时重置为0*如果触发了100毫秒超时,我将终止我的请求订阅

    在我的用例中,我每10分钟安排一次处理。因此,我将全局超时设置为9点30分以完成处理,并让新的应用实例继续进行处理

一件棘手的事情:对于第一条消息,将超时设置为2s。实际上,由于建立连接,第一条消息消息花费的时间更长。因此,在初始化超时时设置一个标志“是否是第一条消息”。

如果可以帮助您实现,我可以共享我的Go代码。

© www.soinside.com 2019 - 2024. All rights reserved.