在我的业务应用程序中,我必须定期对某个主题中的所有消息进行批处理,因为它比以先到先得的方式处理它们便宜。我正在计划执行的当前方法是有一个cronjob
,每T
小时运行一次订阅者。我当前正在解决的问题是一旦处理完所有消息后如何终止订户。我想每cronjob
小时启动一次T
,让订阅者使用主题队列中的所有消息并终止。据我了解,没有pub-sub
Java API可以告诉我主题队列是否为空。我提出了以下两种解决方案:
创建一个异步拉取的订户。休眠t minutes
时消耗所有消息,然后使用subscriber.stopAsync().awaitTerminated();
终止它。在这种方法中,有可能我在终止订户之前可能不会消耗掉所有消息。 Google示例here
使用Pub/Sub Cloud monitoring
至find the value of the metric subscription/num_undelivered_messages
。然后使用Google here提供的同步提取示例提取大量消息。然后终止订户。
有更好的方法吗?
谢谢!
一个月前,我在Go中完成了相同的实现。我的假设如下:
因此,我实现了这一点:*每当我收到一条消息时,*我暂停100ms超时*我处理并确认消息*我将100ms超时重置为0*如果触发了100毫秒超时,我将终止我的请求订阅
在我的用例中,我每10分钟安排一次处理。因此,我将全局超时设置为9点30分以完成处理,并让新的应用实例继续进行处理
一件棘手的事情:对于第一条消息,将超时设置为2s。实际上,由于建立连接,第一条消息消息花费的时间更长。因此,在初始化超时时设置一个标志“是否是第一条消息”。
如果可以帮助您实现,我可以共享我的Go代码。