如何获取GCloud Pubsub的所有消息?

问题描述 投票:0回答:1

我正在编写一个计划的云函数,它订阅一个 pubsub 主题并从 pubsub 中提取所有消息,直到队列空闲。我想确认一些消息,但确认其他消息,以便它们保留在队列中,下次 CF 运行时将被拉出。

此代码一直等待消息,如果队列为空,它应该停止。

def pull_messages():
    subscription_name = ""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path("", subscription_name)
    
    def callback(message):
        try:
            data = json.loads(message.data.decode('utf-8'))
            print(f"Received message: {data.get('order_id')}")
            print(f"Received message: {data.get('created_at')}")
            message.ack()
            logger.info("Message acknowledged")
        except Exception as ex:
            logger.error(f"Error processing message: {ex}")

    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    logger.info(f"Listening for messages on {subscription_name}...\n")
    try:
        streaming_pull_future.result()
    except Exception as ex:
        streaming_pull_future.cancel()
        logger.error(f"Error in subscription: {ex}")
python google-cloud-platform google-cloud-pubsub publish-subscribe python-3.9
1个回答
0
投票

“提取所有消息直到队列空闲”的概念不被视为 Cloud Pub/Sub 的典型模式。通常,您希望让订阅者始终保持正常运行。如果你 nack 消息,那么队列永远不会是空的。被 nacked 的消息可能会立即重新传递,或者,如果使用重试策略,则在退避时间过去的第一时刻。

如果您必须关闭订阅者并按计划重新运行它们,那么获得“队列为空”概念的最简单方法是跟踪回调处理程序中上次收到消息的时间,并,如果经过了足够长的时间,则关闭订阅者。例如:

import datetime
import time
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1



project_id = "<my-project>"
subscription_id = "<my-subscription>"
# Number of seconds after not receiving messages to shut down the subscriber
timeout = 300.0
# Number of seconds interval at which to check to see if messages have been
# received
check_interval = 10.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
last_received = datetime.datetime.now()
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
  global last_received
  print(f"Received {message}.")
  last_received = datetime.datetime.now()
  message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")


with subscriber:
  while datetime.datetime.now() - last_received < datetime.timedelta(seconds=timeout):
    time.sleep(check_interval)
  streaming_pull_future.cancel()
  print("Stopping after no messages received.")
  streaming_pull_future.result()

使用的时间量取决于您对当前运行中丢失消息的可能性的敏感度,并且如果并非所有消息都成功传递,则必须等待下一次运行。如果您计划 nack 的消息数量相对较少,您可以跟踪它们,并且在收到重复消息时不重置

last_received
,但如果您 nack 许多消息,这可能不可行。

© www.soinside.com 2019 - 2024. All rights reserved.